站长之家特效网站,淘宝怎么优化关键词排名,湖北交投建设集团网站,网站制作多少钱啊微批处理#xff08;Micro-Batching#xff09;
微批处理是 Structured Streaming 默认的处理模型。
微批处理 (Micro-batching):
在微批处理模型中#xff0c;实时数据流被分割成小的批次。这些批次按顺序处理#xff0c;每个批次处理像一个小的批处理作业。处理完一个…微批处理Micro-Batching
微批处理是 Structured Streaming 默认的处理模型。
微批处理 (Micro-batching):
在微批处理模型中实时数据流被分割成小的批次。这些批次按顺序处理每个批次处理像一个小的批处理作业。处理完一个批次后结果被输出然后处理下一个批次。这意味着会有一个小的延迟等于批次的大小因为系统需要等待整个批次处理完毕才输出结果。微批处理模型中通常有一个处理周期的概念即系统以固定的时间间隔处理数据。
优点:
容错性: 基于 Spark 的容错机制可以容易地恢复状态和输出。简单性: 开发人员可以使用与批处理相同的API进行流处理降低了学习曲线。集成性: 可以与Spark的其他组件如MLlib、Spark SQL无缝集成。
缺点:
延迟性: 因为处理是按批次进行的所以有固有的延迟通常是秒级。
持续处理Continuous Processing
持续处理是 Structured Streaming 在 Spark 2.3 版本中引入的实验性功能。在这种模型中实时数据流被视为连续的记录流Spark 引擎以较低的延迟毫秒级持续处理每条记录。
持续处理 (Continuous Processing):
持续处理模型中数据是随着其到达即时处理的。没有将数据分批处理而是持续不断地处理流入的数据。这种模式可以减少延迟因为数据一到达就开始处理不必等待。持续处理模型通常能够提供更低的端到端延迟但可能需要更复杂的管理状态和容错机制。
优点:
低延迟: 可以实现毫秒级的处理延迟适用于对延迟敏感的应用。高吞吐: 由于不需要划分批次可以连续不断地处理数据提高了吞吐量。
缺点:
复杂性: 相对于微批处理需要更复杂的容错机制。成熟度: 这是一个较新的功能可能不如微批处理稳定。
这两种模型可以用以下表格进行比较
特性微批处理持续处理处理延迟秒级毫秒级容错性高中到高API一致性与批处理一致与批处理一致成熟度高低吞吐量高高复杂性低高状态管理容易较复杂与其他Spark组件集成无缝无缝
在选择模型时需要根据具体的应用场景、延迟要求和资源情况来决定使用哪种模型。如果应用可以容忍秒级的延迟微批处理是一个成熟且简单的选择。如果应用需要极低的延迟可以尝试使用持续处理模型。
微批处理模型中“写日志”通常是指在处理批次之前记录其信息以便于故障恢复。而在持续处理模型中“写日志”可能更多地关联于实时记录每个事件或数据项的处理状态。
编写Structured Streaming程序
导入pyspark模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
#如果直接使用pyspark交互就不需要导入但是如果是自己编写python就需要导入模块创建SparkSession对象:
在任何Spark应用程序中第一步是创建一个SparkSession对象。这是Structured Streaming编程的入口点。
from pyspark.sql import SparkSessionspark SparkSession \.builder \.appName(Structured Streaming App) \ .getOrCreate()
spark.sparkContext.setLogLevel(warn)稍微讲解一下appName得是被唯一标识的spark.sparkContext.setLogLevel(“warn”)是设置日志显示级别无关紧要的就不输出
定义输入源:
定义输入数据源以及如何读取数据。Structured Streaming支持多种输入源如Kafka、文件系统、Socket等。
df spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, topic1) \.load()
定义转换:
对数据流进行转换处理比如选择需要的字段、进行聚合等操作。这是需要我们自己定义如何操作的
from pyspark.sql.functions import col, windowwords df.selectExpr(CAST(value AS STRING))
wordCounts words.groupBy(window(col(timestamp), 10 minutes, 5 minutes),col(word)
).count()定义输出接收器:
定义如何输出处理后的数据流。输出可以写入到多种外部存储系统中如文件系统、数据库、控制台等。
query wordCounts \.writeStream \.outputMode(complete) \.format(console) \.start()启动流处理:
最后启动流处理。启动后Spark会持续运行处理实时进入的数据。
query.awaitTermination()监控和异常处理:
你可以监控流处理的进度和性能以及添加异常处理逻辑。
try:query.awaitTermination()
except KeyboardInterrupt:query.stop()输入源
file源
Structured Streaming中的文件源允许你监视指定目录中的新文件并从中读取数据。这里是一些常见的选项
path: 需要监控的目录路径。maxFilesPerTrigger: 每个触发器处理的最大文件数。这个选项可以限制在每个触发器批次中应该读取的文件数量有助于控制流处理的速率。latestFirst: 是否先处理最新的文件。设置为true会首先处理最新的文件这可能对某些实时性要求较高的应用程序有用。fileFormat: 文件的格式如json、csv、parquet等。
# 创建一个DataFrame表示从目录/path/to/directory中连续读取的数据
csvDF spark \.readStream \.option(sep, ,) \.schema(userSchema) \ # 可以定义一个schema.csv(/path/to/directory)# 启动流查询输出模式为追加模式并将结果输出到控制台
query csvDF.writeStream \.outputMode(append) \.format(console) \.start()query.awaitTermination()
在这个例子中我们首先使用readStream来创建一个DataFrame读取流。我们通过.option(sep, ,)指定了CSV值之间的分隔符为逗号。schema(userSchema)部分定义了CSV文件的结构你需要在代码中提前定义userSchema。
然后我们指定了监视的目录路径。csv(/path/to/directory)表示我们希望读取的文件类型是CSV。
最后我们定义了一个查询该查询将输出模式设置为append这意味着仅将新的数据行附加到结果中。我们使用.format(console)将输出结果发送到控制台这对于调试和开发是很有用的。start()方法启动流查询而awaitTermination()方法则是让应用程序等待流处理的终止以进行长时间运行。
kafka源
在Structured Streaming中Kafka源允许你从Kafka主题读取数据流。这里是一些常见的Kafka源选项
kafka.bootstrap.servers: Kafka集群的地址列表通常是host1:port1,host2:port2的形式。subscribe: 要订阅的Kafka主题的名称或用逗号分隔的多个主题的列表。startingOffsets: 指定流应从何处开始读取。它可以是latest默认值“earliest”或是JSON字符串指定每个主题的分区起始偏移量。endingOffsets: 流查询终止时的偏移量。仅在批处理查询中使用。failOnDataLoss: 是否在数据丢失例如Kafka主题被删除时使查询失败。默认为true。
下面是一个Kafka源的操作示例它从Kafka主题读取数据流并将其加载为DataFrame
# 创建一个DataFrame表示从名为updates的Kafka主题读取的数据
kafkaDF spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, updates) \.option(startingOffsets, latest) \.load()# 选择我们需要的字段并将value字段从字节转换为字符串
selectedDF kafkaDF.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 启动流查询将结果输出到控制台
query selectedDF.writeStream \.outputMode(append) \.format(console) \.start()query.awaitTermination()在这个例子中我们使用.format(kafka)来告诉Spark我们正在使用Kafka源并通过option来设置Kafka的相关参数。.load()方法加载Kafka主题为DataFrame。
以下是.option()方法设置的参数的解释
kafka.bootstrap.servers: 指定Kafka集群的服务器地址及其端口。这个设置是必须的因为它告诉Spark Streaming在哪里可以找到Kafka集群。格式是一个逗号分隔的主机和端口对的列表例如“host1:port1,host2:port2”。这里的host和port分别对应Kafka服务器的IP地址和监听端口。subscribe: 这个选项用于指定一个或多个Kafka主题来订阅。只要这些主题有数据写入Spark Streaming就会读取这些数据。在这个例子中updates是你想要订阅的Kafka主题的名称。startingOffsets: 定义当你的Spark应用第一次启动并且没有设置偏移量的时候它应该从Kafka主题的哪里开始读取数据。latest表示只读取启动应用程序后生成的数据而earliest表示从可用的最早的数据开始读取。你还可以指定一个JSON字符串来表示每个主题的每个分区的确切开始偏移量。
selectExpr是一个转换操作它允许你运行SQL表达式。在这里我们将key和value列从字节转换为字符串类型便于阅读和处理。
查询的其余部分和之前的例子类似这次我们也是以追加模式输出到控制台并启动流查询。
请注意你需要有一个运行中的Kafka集群并已经创建了相关的主题以及在Spark集群上配置了适当的Kafka依赖。
socket源
在Structured Streaming中使用socket源意味着数据流将来自于一个TCP套接字连接。这是最简单的流式数据源之一通常用于测试和原型设计阶段。它允许您从通过TCP连接发送的数据流中读取文本数据。
以下是如何在Structured Streaming中设置socket源
from pyspark.sql import SparkSession# 初始化SparkSession
spark SparkSession.builder \.appName(StructuredSocketRead) \.getOrCreate()# 创建流式DataFrame连接到指定的socket
lines spark.readStream \.format(socket) \.option(host, localhost) \.option(port, 9999) \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询
query lines.writeStream \.outputMode(append) \.format(console) \.start()query.awaitTermination()上面的代码片段执行了以下操作
通过SparkSession.builder初始化了一个SparkSession。使用readStream方法创建了一个流式DataFrame它将会连接到在localhost上的9999端口监听的TCP套接字。通过format(socket)指定了数据源格式为socket。使用.option(host, localhost)和.option(port, 9999)设置了监听的主机地址和端口号。.load()方法触发了对socket源的连接。之后可以在lines DataFrame上应用各种转换操作如过滤、选择、聚合等。writeStream定义了如何输出处理后的流数据这里通过.format(console)指定了输出到控制台。.start()开始接收数据并处理.awaitTermination()方法让程序持续运行直到手动停止或者遇到错误。
使用socket源进行Structured Streaming是一个好方法可以实时测试您的流处理逻辑因为您可以很容易地通过如netcat之类的工具来发送数据。
然后在另外一个打开虚拟机另外窗口输入
nc -lk 你的端口号rate源
在Structured Streaming中rate源每秒生成特定的数据行两个列的数据流timestamp和value。这个源非常适合生成简单的数据流进行测试和调试。
每个输出行包含一个timestamp和value。timestamp是每个触发器触发时的当前时间戳而value是从程序开始以来的触发器触发的次数。
下面是如何在Structured Streaming中设置rate源的例子
from pyspark.sql import SparkSession# 初始化SparkSession
spark SparkSession.builder \.appName(StructuredRateRead) \.getOrCreate()# 创建流式DataFrame用rate源
df spark.readStream \.format(rate) \.option(rowsPerSecond, 1) \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询输出到控制台
query df.writeStream \.outputMode(append) \.format(console) \.start()query.awaitTermination()上面的代码片段执行了以下操作
使用SparkSession.builder初始化了一个SparkSession。使用readStream方法创建了一个流式DataFrame它将会使用rate源。通过.option(rowsPerSecond, 1)设置每秒生成的行数。.load()方法触发了对rate源的连接。使用.writeStream定义了如何输出处理后的流数据。通过.format(console)指定了输出到控制台。.start()开始接收数据并处理.awaitTermination()方法让程序持续运行直到手动停止或者遇到错误。
rate源非常适合开发和测试时生成连续的、预测的数据流但它不适用于生产环境因为它不是从实际的数据源读取数据。
可以使用option方法来配置这个源的行为。以下是rate源的一些常见选项和它们的功能 rowsPerSecond指定每秒生成的行数。这个选项可以帮助你控制数据生成的速率。 示例.option(rowsPerSecond, 10) 表示每秒生成10行数据。 rampUpTime在指定时间内逐渐增加到rowsPerSecond指定的速率。这通常用于模拟数据源在启动时从没有数据到达指定速率的过渡过程。 示例.option(rampUpTime, 1m) 在1分钟内逐渐增加生成的数据行数。 numPartitions指定生成的数据将在多少个分区中分布。这可以帮助模拟并行数据流的情况。 示例.option(numPartitions, 2) 表示数据将分布在两个分区中。
使用这些选项的例子
df spark.readStream \.format(rate) \.option(rowsPerSecond, 100) \.option(rampUpTime, 5s) \.option(numPartitions, 2) \.load()这将会创建一个数据流初始时每秒100行数据5秒内逐渐增加到这个速率并且数据在两个分区中分布。
接收器
Structured Streaming中的输出接收器Sink是指数据流最终输出到的地方。Spark Structured Streaming提供了多种不同的输出接收器以支持将数据流输出到各种外部系统和格式。以下是一些常见的输出接收器 文件接收器File Sink输出数据到文件系统。支持的格式包括文本、JSON、CSV、Parquet等。可以指定文件输出目录和文件格式。 Kafka接收器Kafka Sink输出数据到Kafka主题。可以指定Kafka服务器的地址和要写入的主题。 控制台接收器Console Sink将数据输出到控制台主要用于调试和开发。 内存接收器Memory Sink输出数据到内存表中允许在内存中查询数据。这主要用于快速测试和原型开发。 Foreach接收器提供了一个通用接口允许你对数据流中的每个记录执行任意操作。这可以用于实现自定义的输出逻辑例如写入自定义外部存储或调用外部API。
使用Structured Streaming的输出接收器时你需要指定输出模式如append、“complete或update”输出接收器类型如console、“kafka”、file等以及任何必要的配置选项。
例如将数据流输出到控制台的代码示例
query df.writeStream \.outputMode(append) \.format(console) \.start()将数据流输出到Kafka的代码示例
query df.writeStream \.outputMode(update) \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(topic, updates) \.start()这些代码示例展示了如何将数据流输出到不同类型的接收器。每种类型的接收器可能有不同的配置选项和限制所以在使用时需要查阅具体的文档。