自学内容网 自学内容网

Python版Spark Structured Streaming编程指南

Structured Streaming中文参考指南

一、概述

Structured Streaming是构建在Spark SQL引擎之上的可扩展且容错的流处理引擎。用户可以像处理静态数据的批处理计算一样表达流计算,Spark SQL引擎会持续增量地运行计算,并在流数据不断到达时更新最终结果。用户可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表达流聚合、事件时间窗口、流到批处理的连接等操作,计算在优化的Spark SQL引擎上执行,系统通过检查点和预写日志确保端到端的精确一次容错保证。

内部默认情况下,Structured Streaming查询使用微批处理引擎处理,将数据流作为一系列小批量作业处理,实现低至100毫秒的端到端延迟和精确一次容错保证。从Spark 2.3开始,引入了新的低延迟处理模式——连续处理模式,可实现低至1毫秒的端到端延迟和至少一次保证,用户可根据应用需求选择模式。

二、快速示例

以维护从监听TCP套接字的数据服务器接收的文本数据的实时单词计数为例,展示如何使用Structured Streaming实现。

  1. 导入必要的类并创建本地SparkSession
    • 在Python中,使用如下代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
  .builder \
  .appName("StructuredNetworkWordCount") \
  .getOrCreate()
  1. 创建表示从服务器接收的文本数据流的DataFrame并进行转换计算单词计数
    • 创建监听本地9999端口的DataFrame:
lines = spark \
  .readStream \
  .format("socket") \
  .option("host", "localhost") \
  .option("port", 9999) \
  .load()
  • 将每行文本拆分为单词:
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word"))
  • 计算每个单词的出现次数:
wordCounts = words.groupBy("word").count()
  1. 启动流式查询并将结果输出到控制台
query = wordCounts \
  .writeStream \
  .outputMode("complete") \
  .format("console") \
  .start()
query.awaitTermination()

执行代码后,启动Netcat作为数据服务器(nc -lk 9999),在另一个终端运行示例代码,在Netcat终端输入的文本会被实时计数并每秒在屏幕上打印结果。

三、编程模型

  1. 基本概念
    • 将实时数据流视为不断追加的表,对流的查询生成“结果表”。每经过一个触发间隔(如每秒),新数据作为新行追加到输入表,进而更新结果表,结果表更新时将变化的结果行写入外部存储(输出)。输出模式有:
      • Complete Mode(完整模式):整个更新后的结果表将被写入外部存储,由存储连接器决定如何处理整个表的写入。
      • Append Mode(追加模式):仅将自上次触发以来在结果表中追加的新行写入外部存储,适用于结果表中现有行不期望改变的查询。
      • Update Mode(更新模式):仅将自上次触发以来在结果表中更新的行写入外部存储(从Spark 2.1.1开始可用),与完整模式不同,此模式仅输出自上次触发以来更改的行,若查询不包含聚合,等效于追加模式。
  2. 处理事件时间和延迟数据
    • 事件时间是嵌入在数据本身中的时间,在模型中,每个事件是表中的一行,事件时间是行中的列值,基于事件时间的窗口聚合(如每分钟的事件数量)可通过对事件时间列的分组和聚合来实现,与处理静态数据集类似。同时,模型能自然处理晚于预期到达的数据,Spark可更新旧聚合,并通过水印(watermarking)机制(从Spark 2.1开始支持)指定延迟数据的阈值,清理旧的聚合状态,限制中间状态数据的大小。
  3. 容错语义
    • Structured Streaming旨在提供端到端精确一次语义,通过可靠跟踪处理进度来处理故障。每个流数据源都有偏移量(类似Kafka偏移量或Kinesis序列号),引擎使用检查点和预写日志记录每个触发中处理的数据偏移范围,流接收器设计为幂等以处理重新处理,从而确保在任何故障下的端到端精确一次语义。

四、使用Datasets和DataFrames的API

  1. 创建流式DataFrames和流式Datasets
    • 流式DataFrames可通过SparkSession.readStream()返回的DataStreamReader接口创建(在R中使用read.stream()方法),与创建静态DataFrame的read接口类似,可指定源的详细信息(如数据格式、模式、选项等)。
    • 内置源包括:
      • 文件源:读取目录中的文件作为数据流,支持多种文件格式(如文本、CSV、JSON、ORC、Parquet等),文件按修改时间顺序处理(可设置latestFirst反转顺序),支持设置maxFilesPerTriggerfileNameOnlymaxFileAge等选项,还可设置完成文件的清理方式(如archivedeleteoff)。
      • Kafka源:从Kafka读取数据,兼容Kafka 0.10.0或更高版本,详见Kafka Integration Guide
      • 套接字源(用于测试):从套接字连接读取UTF8文本数据,监听服务器套接字在驱动程序端,不提供端到端容错保证。
      • 速率源(用于测试):按指定每秒行数生成数据,包含timestamp(消息分发时间)和value(消息计数)列。
    • 一些源不是容错的,使用时需注意。默认情况下,基于文件的结构化流要求指定模式,若临时使用,可设置spark.sql.streaming.schemaInferencetrue启用模式推断。分区发现会在存在命名的子目录时自动递归,用户提供的模式中的分区列将由Spark根据读取文件的路径填充。
  2. 对流式DataFrames/Datasets的操作
    • 支持对流式DataFrames/Datasets进行各种操作,如选择、投影、聚合、窗口操作、连接操作、去重等,类似于静态DataFrames/Datasets的操作,但部分操作有差异或限制。
    • 基本操作 - 选择、投影、聚合:大多数常见的DataFrame/Dataset操作支持流式处理,如选择特定列、根据条件过滤、分组聚合等,也可将流式DataFrame/Dataset注册为临时视图并应用SQL命令。
    • 窗口操作基于事件时间:通过groupBy()window()操作可实现基于滑动事件时间窗口的聚合,类似于分组聚合,聚合值根据事件时间落入的窗口维护。同时,通过withWatermark()可定义水印来处理延迟数据,指定事件时间列和延迟阈值,在更新输出模式下,引擎会根据水印清理旧状态,确保聚合结果的准确性;在追加输出模式下,引擎等待延迟数据被计算,然后追加最终计数到结果表/接收器。使用水印时需注意满足一定条件(如输出模式为追加或更新、聚合必须包含事件时间列或基于事件时间列的窗口、withWatermark必须在聚合前调用且在同一列上)。
    • 连接操作
      • 流 - 静态连接:从Spark 2.0开始支持流式与静态DataFrame/Dataset之间的连接(内连接和某些类型的外连接),连接结果增量生成,流 - 静态连接无状态,无需状态管理,但某些类型的外连接不支持。
      • 流 - 流连接:从Spark 2.3开始支持,由于两个数据流的视图在任何时候都是不完整的,因此需要缓冲过去的输入作为流状态,以便匹配未来输入并生成连接结果,同时可通过水印和事件时间约束自动处理延迟和无序数据,限制状态大小。支持内连接(可选水印)和外连接(必须指定水印和事件时间约束),连接结果生成方式与内连接类似,但外连接的NULL结果生成会有延迟,取决于水印延迟和时间范围条件,且在当前微批处理引擎实现中,水印在微批处理结束时推进,若输入流一段时间未接收数据,外连接输出可能会延迟。截至Spark 2.4,连接仅在追加输出模式下支持,且在连接前不能使用其他非映射类操作(如流式聚合、在更新模式下的mapGroupsWithStateflatMapGroupsWithState)。
    • 流式去重:可使用事件中的唯一标识符对数据流中的记录进行去重,与静态去重类似,可选择是否使用水印。若有重复记录到达的上限,可定义水印和事件时间列进行去重,查询会根据水印清理旧状态数据;若没有上限,则存储所有过去记录的数据作为状态。
    • 处理多个水印的策略:一个流式查询可能有多个输入流进行联合或连接,每个输入流可使用withWatermark指定不同的延迟数据阈值,执行查询时,Structured Streaming会跟踪每个输入流的最大事件时间,计算水印并选择一个全局水印用于有状态操作。默认选择最小值作为全局水印以确保数据不被意外丢弃,但从Spark 2.4开始,可设置spark.sql.streaming.multipleWatermarkPolicymax选择最大值作为全局水印,使全局水印以最快流的速度移动,但会导致较慢流的数据被积极丢弃,需谨慎使用。
    • 任意有状态操作:从Spark 2.2开始,可使用mapGroupsWithStateflatMapGroupsWithState操作在分组数据集上应用用户定义代码来更新用户定义状态,用于处理比聚合更高级的有状态操作(如会话跟踪)。用户定义的状态函数应根据输出模式的语义实现,否则可能导致正确性问题,可通过将流式查询拆分为多个查询来解决。
    • 不支持的操作:流式DataFrames/Datasets不支持一些操作,如多个流式聚合(在流式数据集上的聚合链)、limittake前N行、distinct操作(在流式数据集上)、在非聚合后进行排序操作(除非在完整输出模式下且在聚合后)、某些类型的流 - 流外连接等,一些Dataset方法(如count()foreach()show())在流式数据集上也不适用,因为它们会立即运行查询并返回结果,这在流式数据集中没有意义。
  3. 启动流式查询
    • 使用Dataset.writeStream()返回的DataStreamWriter接口启动流式计算,需指定以下一个或多个内容:
      • 输出接收器详细信息:如数据格式、位置等。
      • 输出模式:指定写入输出接收器的内容,不同类型的查询支持不同的输出模式(如查询类型与支持的输出模式的兼容性矩阵所示)。
      • 查询名称:可选,用于标识查询的唯一名称。
      • 触发间隔:可选,指定触发间隔,若未指定,系统在前一处理完成后立即检查新数据可用性,若错过触发时间,系统会在前一处理完成后立即触发处理。
      • 检查点位置:对于某些可保证端到端容错的输出接收器,指定系统写入所有检查点信息的位置,应在HDFS兼容的容错文件系统中的目录。
    • 输出模式包括:
      • Append Mode(追加模式,默认):仅输出自上次触发以来在结果表中添加的新行,适用于结果表中添加的行不会改变的查询,保证每行仅输出一次(假设接收器是容错的)。
      • Complete Mode(完整模式):每次触发后将整个结果表输出到接收器,支持聚合查询。
      • Update Mode(更新模式):从Spark 2.1.1开始可用,仅输出自上次触发以来在结果表中更新的行。
    • 输出接收器类型包括:
      • 文件接收器:将输出存储到目录中,支持多种文件格式(如parquetorcjsoncsv等),支持写入分区表(按时间分区可能有用),是容错的(精确一次)。
      • Kafka接收器:将输出存储到Kafka的一个或多个主题中,支持至少一次的容错保证,详见Kafka Integration Guide
      • Foreach接收器:对输出的记录运行任意计算,可实现自定义写逻辑,默认提供至少一次写保证,可通过batchId去重实现精确一次保证,支持追加、更新、完整输出模式。
      • ForeachBatch接收器:在每个微批处理的输出上执行指定函数,支持在Scala、Java和Python中使用,可重用现有批数据源、写入多个位置、应用其他DataFrame操作,但不与连续处理模式一起工作,其容错性取决于实现。
      • 控制台接收器(用于调试):每次触发时将输出打印到控制台/标准输出,支持追加和完整输出模式,适用于低数据量的调试,因为整个输出在每次触发后收集并存储在驱动程序内存中。
      • 内存接收器(用于调试):输出存储在内存中作为内存表,支持追加和完整输出模式,适用于低数据量的调试,但需谨慎使用,因为在完整模式下,重启查询会重新创建整个表。

使用Foreach和ForeachBatch

  • ForeachBatch:允许在流式查询的每个微批处理的输出数据上执行指定函数,从Spark 2.4开始支持在Scala、Java和Python中使用。它接受两个参数:一个包含微批处理输出数据的DataFrame或Dataset,以及微批处理的唯一ID。
    • 通过foreachBatch,可以实现以下功能:
      • 重用现有批数据源:对于许多存储系统,可能没有可用的流接收器,但可能已有批查询的数据写入器。使用foreachBatch,可以在每个微批处理的输出上使用批数据写入器。
      • 写入多个位置:如果要将流式查询的输出写入多个位置,可以简单地多次写入输出DataFrame/Dataset。然而,每次写入尝试可能导致输出数据重新计算(包括可能重新读取输入数据)。为避免重新计算,应缓存输出DataFrame/Dataset,写入多个位置,然后取消缓存。
      • 应用其他DataFrame操作:许多DataFrame和Dataset操作在流式DataFrames中不支持,因为Spark在这些情况下不支持生成增量计划。使用foreachBatch,可以在每个微批处理输出上应用其中一些操作,但需要自己考虑执行该操作的端到端语义。
    • 注意事项:
      • 默认情况下,foreachBatch仅提供至少一次写保证。但是,可以使用提供给函数的batchId作为去重输出的方式,以获得精确一次保证。
      • foreachBatch不与连续处理模式一起工作,因为它从根本上依赖于流式查询的微批处理执行。如果在连续模式下写入数据,请使用foreach
  • Foreach:如果foreachBatch不可用(例如,相应的批数据写入器不存在,或处于连续处理模式),则可以使用foreach表达自定义写入逻辑。具体来说,可以将数据写入逻辑分为三个方法:openprocessclose。从Spark 2.4开始,在Scala、Java和Python中可用。
    • 在Python中,可以通过两种方式调用foreach:在函数中或在对象中。函数提供了一种简单的方式来表达处理逻辑,但在因故障导致某些输入数据重新处理时,无法对生成的数据进行去重。对于这种情况,必须在对象中指定处理逻辑。
    • 执行语义:当流式查询启动时,Spark按以下方式调用函数或对象的方法:
      • 此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。
      • 此对象必须是可序列化的,因为每个任务将获得提供对象的新序列化 - 反序列化副本。因此,强烈建议在open()方法调用后进行写入数据的任何初始化(例如,打开连接或启动事务),这表示任务已准备好生成数据。
      • 方法生命周期如下:
        • 对于每个具有partitionId的分区:
          • 对于每个具有epochId的流数据批/纪元:
            • 调用open(partitionId, epochId)方法。
            • 如果open(…)返回true,对于分区和批/纪元中的每一行,调用process(row)方法。
            • 调用close(error)方法,其中error是在处理行时看到的任何错误(如果有)。
        • 如果存在open()方法且成功返回(无论返回值如何),则调用close()方法(如果存在),除非JVM或Python进程在中间崩溃。
      • 注意:Spark不保证(partitionId, epochId)的输出相同,因此不能使用(partitionId, epochId)实现去重。例如,源由于某些原因提供不同数量的分区,Spark优化更改分区数量等。有关更多详细信息,请参阅SPARK - 28650。如果需要对输出进行去重,请尝试使用foreachBatch

触发器

流式查询的触发器设置定义了流数据处理的时间,查询可以作为微批处理查询(具有固定批处理间隔)或连续处理查询执行。支持以下类型的触发器:

  • 未指定(默认):如果未显式指定触发器设置,则默认情况下,查询将以微批处理模式执行,在前一个微批处理完成处理后,将立即生成新的微批处理。
  • 固定间隔微批处理:查询将以微批处理模式执行,微批处理将在用户指定的间隔启动。如果前一个微批处理在间隔内完成,引擎将等待直到间隔结束才启动下一个微批处理;如果前一个微批处理花费的时间超过间隔(即错过间隔边界),则下一个微批处理将在前一个完成后立即启动(即不会等待下一个间隔边界);如果没有新数据可用,则不会启动微批处理。
  • 一次性微批处理:查询将仅执行一个微批处理来处理所有可用数据,然后自行停止。这在希望定期启动集群、处理自上一周期以来可用的所有数据,然后关闭集群的场景中很有用,在某些情况下,这可能会节省大量成本。
  • 连续处理(具有固定检查点间隔,实验性):查询将在新的低延迟、连续处理模式下执行。请在下面的“连续处理”部分中了解更多信息。

以下是一些代码示例:

# 默认触发器(尽快运行微批处理)
df.writeStream \
 .format("console") \
 .start()

# 处理时间触发器,微批处理间隔为两秒
df.writeStream \
 .format("console") \
 .trigger(processingTime='2 seconds') \
 .start()

# 一次性触发器
df.writeStream \
 .format("console") \
 .trigger(once=True) \
 .start()

# 连续触发器,检查点间隔为一秒
df.writeStream \
 .format("console") \
 .trigger(continuous='1 second') \
 .start()

管理流式查询

启动查询时创建的StreamingQuery对象可用于监控和管理查询。可以使用该对象获取查询的唯一标识符、运行ID、名称、解释查询、停止查询、阻塞直到查询终止、获取查询异常(如果查询因错误终止)、获取最近的进度更新数组和最近的进度更新。
可以在单个SparkSession中启动任意数量的查询,它们将并发运行并共享集群资源。可以使用sparkSession.streams()获取StreamingQueryManager,用于管理当前活动的查询,如获取活动查询列表、通过ID获取查询对象、阻塞直到任何一个查询终止。

监控流式查询

  • 交互式读取指标:可以使用streamingQuery.lastProgress()streamingQuery.status()直接获取活动查询的当前状态和指标。lastProgress()在Scala和Java中返回StreamingQueryProgress对象,在Python中返回包含相同字段的字典,它包含有关流的上一次触发中处理进度的所有信息,如处理的数据、处理速率、延迟等。status()在Scala和Java中返回StreamingQueryStatus对象,在Python中返回包含相同字段的字典,它提供有关查询当前正在执行的操作的信息,如是否正在等待数据到达、是否有触发活动、是否正在处理数据等。
  • 使用异步API以编程方式报告指标:可以通过附加StreamingQueryListener(在Scala和Java中可用)异步监控与SparkSession关联的所有查询。附加自定义StreamingQueryListener对象后,将在查询启动和停止以及活动查询取得进展时获得回调。
  • 使用Dropwizard报告指标:Spark支持使用Dropwizard库报告指标。要启用Structured Streaming查询的指标报告,必须在SparkSession中显式启用spark.sql.streaming.metricsEnabled配置。启用此配置后,在SparkSession中启动的所有查询将通过Dropwizard将指标报告到已配置的接收器(如Ganglia、Graphite、JMX等)。

从故障中恢复(使用检查点)

在发生故障或有意关闭时,可以恢复先前查询的进度和状态,并从停止的地方继续。这通过检查点和预写日志实现。可以在查询启动时在DataStreamWriter中配置检查点位置,查询将把所有进度信息(即每个触发中处理的偏移范围)和运行聚合(如快速示例中的单词计数)保存到检查点位置。检查点位置必须是HDFS兼容文件系统中的路径。

流式查询更改后的恢复语义

在从同一检查点位置重新启动之间,对流式查询的更改存在限制。以下是一些不允许或效果未明确定义的更改类型:

  • 输入源的数量或类型更改:不允许。
  • 输入源参数的更改:是否允许以及更改的语义是否明确定义取决于源和查询。例如,添加/删除/修改速率限制通常是允许的,但更改订阅的主题/文件通常不允许,因为结果不可预测。
  • 输出接收器类型的更改:某些特定接收器组合之间的更改是允许的,但需要逐案验证。例如,从文件接收器更改为Kafka接收器是允许的,Kafka将仅看到新数据;从Kafka接收器更改为文件接收器不允许;从Kafka接收器更改为foreach或反之亦然是允许的。
  • 输出接收器参数的更改:是否允许以及更改的语义是否明确定义取决于接收器和查询。例如,更改文件接收器的输出目录不允许;更改Kafka接收器的输出主题是允许的;更改用户定义的foreach接收器(即ForeachWriter代码)是允许的,但更改的语义取决于代码。
  • 投影/过滤/映射类操作的更改:某些情况是允许的。例如,添加/删除过滤器是允许的;具有相同输出模式的投影更改是允许的;具有不同输出模式的投影更改在输出接收器允许模式更改的情况下是有条件允许的。
  • 有状态操作的更改:流式查询中的一些操作需要维护状态数据以持续更新结果。Structured Streaming自动将状态数据检查点到容错存储(如HDFS、AWS S3、Azure Blob存储)并在重新启动后恢复。但是,这假定状态数据的模式在重新启动之间保持不变。这意味着在重新启动之间,对流式查询的任何有状态操作的更改(即添加、删除或模式修改)都是不允许的。例如,流式聚合、流式去重、流 - 流连接、任意有状态操作等,若要支持状态模式更改,可以使用编码/解码方案(如将状态保存为Avro编码字节),以便在查询重新启动之间自由更改状态模式,因为二进制状态将始终成功恢复。

五、连续处理

实验性

连续处理是Spark 2.3引入的一种新的实验性流处理执行模式,可实现低至1毫秒的端到端延迟和至少一次容错保证,与默认的微批处理引擎(可实现精确一次保证但延迟最低约为100毫秒)相比。对于某些类型的查询(如下所述),无需修改应用逻辑(即无需更改DataFrame/Dataset操作)即可选择执行模式。

要在连续处理模式下运行支持的查询,只需指定一个连续触发器并设置所需的检查点间隔作为参数。例如:

spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
 .option("subscribe", "topic1") \
 .load() \
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
 .writeStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
 .option("topic", "topic1") \
 .trigger(continuous="1 second") \     # 仅查询中的更改
 .start()

检查点间隔为1秒意味着连续处理引擎将每秒记录查询的进度。生成的检查点格式与微批处理引擎兼容,因此任何查询都可以在任何触发器下重新启动。例如,以微批处理模式启动的支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,将获得至少一次容错保证。

支持的查询

截至Spark 2.4,连续处理模式仅支持以下类型的查询:

  • 操作:仅支持映射类Dataset/DataFrame操作,即仅投影(selectmapflatMapmapPartitions等)和选择(wherefilter等)操作。支持除聚合函数(因为聚合尚未支持)、current_timestamp()current_date()(使用时间的确定性计算具有挑战性)之外的所有SQL函数。
    • Kafka源:支持所有选项。
    • 速率源:适合测试,仅支持连续模式下的numPartitionsrowsPerSecond选项。
  • 接收器
    • Kafka接收器:支持所有选项。
    • 内存接收器:适合调试。
    • 控制台接收器:适合调试,支持所有选项,控制台将按指定的连续触发检查点间隔打印。

注意事项

  • 连续处理引擎启动多个长期运行的任务,这些任务不断从源读取数据、处理数据并持续写入接收器。查询所需的任务数量取决于查询可以从源并行读取的分区数量。因此,在启动连续处理查询之前,必须确保集群中有足够的核心来并行运行所有任务。例如,如果从具有10个分区的Kafka主题读取数据,则集群必须至少有10个核心才能使查询取得进展。

  • 停止连续处理流可能会产生虚假的任务终止警告,可以安全地忽略这些警告。

  • 当前没有自动重试失败任务的功能,任何失败都会导致查询停止,需要手动从检查点重新启动。

六、其他信息

注意事项

  • 几个配置在查询运行后不可修改,要更改它们,需丢弃检查点并启动新查询,这些配置包括:
    • spark.sql.shuffle.partitions:这是由于状态的物理分区,状态通过对键应用哈希函数进行分区,因此状态的分区数应保持不变。如果要为有状态操作运行更少的任务,coalesce会有助于避免不必要的重新分区,在coalesce之后,减少的任务数量将保持,除非发生另一次洗牌。
    • spark.sql.streaming.stateStore.providerClass:为了正确读取查询的先前状态,状态存储提供程序的类应保持不变。
    • spark.sql.streaming.multipleWatermarkPolicy:修改此配置会导致查询包含多个水印时水印值不一致,因此策略应保持不变。

原文地址:https://blog.csdn.net/qq_68076599/article/details/143820244

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!