自学内容网 自学内容网

spark:Structured Streaming介绍

1. Structured Streaming介绍

spark进行实时数据流计算时有两个工具:Spark Streaming Structured Streaming
Spark Streaming:编写RDD代码处理数据流,可以解决非结构化的流式数据。
Structured Streaming:编写df代码处理数据流,可以解决结构化和半结构化的流式数据。
Structured Streaming 是基于 Spark SQL 引擎构建的可扩展容错流处理引擎,是在Spark 2.X出来的流框架,采用了无界表(使用DataFrame)的概念,流数据相当于往一个表上不断追加行,数据会被转化为DataFrame和DataSets类型,可用采用SQL的方式操作结构化流数据。

1.1 实时计算和离线计算

1.1.1 实时计算

实时计算,通常也称为“实时流计算”“流式计算”
流数据处理是指实时、连续地处理数据流。数据在被产生或接收后立即处理,并不需要等待所有数据到齐。数据的处理和传输是“逐条”进行的。

  • 处理时间:由于数据被实时处理,系统响应时间非常短,通常在毫秒或者秒级。
  • 数据量:流数据通常是无限的,数据持续不断地被生成和处理,系统需要持续运行。

1.1.2 离线计算

离线计算,通常也称为“批处理”,表示那些离线批量、延时较高的静态数据处理过程。
批数据处理是指在一个预定时间内收集一批数据,然后一次性对这批数据进行处理。数据是成批处理的,而不是逐条处理的。

  • 处理时间:批处理通常不是实时的,处理的延迟可能是分钟、小时甚至更长,T+1.
  • 数据量:批处理通常在所有数据收集完毕后进行,这意味着处理的数据集是固定大小的(如每日、每小时的数据)。数据处理完成后自动结束。

1.2 有界和无界数据

  • 有界数据
    • 有起始位置,有结束位置。比如文件数据,有起始行,有结束行。
    • 有明确的数据容量大小。处理数据时就能知道处理的数据大小。
    • 在处理数据时,按批次处理。
    • 数据处理完程序就结束
    • 离线计算时处理的都是有界数据。
  • 无界数据
    • 有起始位置,没有结束位置,知道数据的起始位置在哪里,但是数据的结束位置不知道(因为数据在不断产生,什么时候结束不知道)。
    • 流式数据都是无界数据。
    • 无界数据的总量是不确定的,数据是不断产生的。
    • 数据有时效性(有效期)。
    • 处理无界数据时,程序是持续运行的

2. 简单使用

  • socket服务
    • 安装ncat服务
      • 在线安装
        • yum install nc
      • 离线安装
        • rpm -ivh ncat-7.93-1.x86_64.rpm
    • 启动服务绑定9999端口
      • ncat -lk 9999
        在这里插入图片描述
  • Structured Streaming代码程序
    • 使用的是SparkSQL,所以在进行代码编写时使用SparkSQL的方法进行编写。
    • 使用 SparkSession
2,李四,22,男
3,王五,20,男
4,赵六,20,男
4,赵六,32,男
5,赵六,42,男
# 读取scoket产生的实时数据流
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

#数据流读取使用readStream
opt = {
    #指定读取socket服务的IP地址
    'host':'192.168.88.100',
    #指定读取端口
    'port':9999
}
#使用socket方式读取网站服务的数据,得到一个无界的dataframe
df = ss.readStream.format('socket').options(**opt).load()

#实时输出数据
df.writeStream.start(format='console', outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述

3. 编程模型

在进行流式数据开发,代码实现的过程
Structured Streaming在处理数据时分为四个部分

  • Input Table输入数据表 无界表 readStream

  • Qurey 对数据进行查询计算 DSL或SQL计算过程

  • Result Table 保存计算结果

  • Output 输出结果 writerStream
    在这里插入图片描述

  • 读取指定数据源的数据得到一个无界表数据

  • 对无界表数据进行计算处理

  • 接收无界表计算的结果

  • 将接收到的结果进行输出

4. 数据处理流程

4.1 读取数据Source

支持读取的数据源

  • 文件数据
    • 从文件中读取数据
  • kafka数据 常用
    • 从kafka中读取数据
  • socket数据
    • 从网络端口读取数据
  • Rate数据
    • 框架自己产生数据,测试性能,优化参数

4.1.1 文件数据处理

option参数描述说明
maxFilesPerTrigger每个batch最多的文件数,默认是没有限制。比如我设置了这个值为1,那么同时增加了5个文件,这5个文件会每个文件作为一波数据,更新streaming dataframe。
latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)。
fileNameOnly是否检查新文件只有文件名而不是完整路径(默认值:false) 将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/数据集.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”。
  • 格式
spark.readStream.csv()

spark.readStream.load(format='csv',**options)
#流式读取文件数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
# 读取文件数据
# 指定目录  不要指定文件名
# df = ss.readStream.load(format='csv',path='hdfs://node1:8020/data_stream',schema='id int,name string,gender string,age int,cls string')
# df = ss.readStream.csv(path='hdfs://node1:8020/data_stream',header=False,sep=',',schema='id int,name string,gender string,age int,cls string')
# df = ss.readStream.text(path='hdfs://node1:8020/data_stream2')
df = ss.readStream.option('maxFilesPerTrigger',1).csv('hdfs://node1:8020/data_csv',header=False,sep=',',schema='id int,name string,gender string,age int,cls string')

#输出
df.writeStream.start(format='console',outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述
注意

  • 读取文件数据时,不能指定某个具体文件,而是指定文件所在的目录。
    • 目录下的同一个文件只会被读取一次,处理过的文件数据不会再重新处理。
  • 文件的读取方式在实际开发中用的比较少,生产一条数据,就要生成一个文件。但是,如果将多条数据收集之后写入统一文件,就变成了和批处理一样的开发。
  • 实际开发中很少使用spark流读文件,可以使用flume工具流式读取文件,然后在通过spark读取产生flume。

4.2 计算操作 Operation

采用DSL的方式进行数据的计算
where 方法
groupby
orderby

#数据流的计算
from pyspark.sql import SparkSession, functions as F

ss = SparkSession.builder.getOrCreate()
#流式读取数据,转为无界的dataframe
opt = {
    # 指定读取的socket服务的ip地址
    'host':'192.168.88.100',
    # 指定读取的端口
    'port':9999
}
df = ss.readStream.load(format='socket',**opt)
#流式数据的计算
#SQL
df.createTempView('stu')
sql_str = """
select gender,count(id),sum(age) from (
  select split(value,',')[0] as id,
  split(value,',')[1] as name,
  cast(split(value,',')[2] as int) as age,
  split(value,',')[3] as gender
   from stu) t1
   group by gender
"""
df_res = ss.sql(sql_str)

#结果数据输出
df_res.writeStream.start(format='console',outputMode='complete').awaitTermination()

输入:
在这里插入图片描述
运行结果:
在这里插入图片描述

4.3 数据输出 Sink

在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下

  • 输出模式(output mode):指定写入输出接收器的内容。
  • 查询名称(query name):(可选)指定用于标识的查询的唯一名称。将数据输出到内存可以指定一个表名
  • 触发间隔 (Trigger interval):(可选)指定触发间隔。如果未指定,系统将在上一次处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过触发时间,则系统将立即触发处理。 写入数据的时间间隔
  • 检查点位置(checkpoint Location):对于可以保证端到端容错的某些输出接收器,请指定系统将写入所有检查点信息的位置。这应该是与HDFS兼容的容错文件系统中的一个目录。 文件方式输出时,必须指定。

4.3.1 输出模式

outputMode指定输出模式

  • append模式,默认的模式,每次只能看到新增的行的内容,不支持聚合操作,一般在进行查询展示时使用。
  • complete模式,每次都是对所有数据进行处理,必须聚合操作。
  • update模式,当数据只有新增,没有聚合类似append;如果对数据进行聚合,只会显示更新的数据。
    append模式:
# 输出模式
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()


# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')

# 数据计算
df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')

# 数据输出
# outputMode 输出模式
# append    输出最新的数据           支持 select,where
# complete  输出所有聚合计算后的数据   支持 groupby聚合计算
# update    输出最新的数据包括聚合计算 支持 groupby聚合计算 select,where
df_result.writeStream.start(format='console',outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述
complete模式:

# 输出模式
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()


# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')

# 数据计算
# df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')
df_result = df.groupby('gender').avg('age')
# 数据输出
# outputMode 输出模式
# append    输出最新的数据           支持 select,where
# complete  输出所有聚合计算后的数据   支持 groupby聚合计算
# update    输出最新的数据包括聚合计算 支持 groupby聚合计算
df_result.writeStream.start(format='console',outputMode='complete').awaitTermination()

运行结果:
在这里插入图片描述
update模式:

# 输出模式
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()


# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')

# 数据计算
# df_result = df.select(df.id,df.name,df.age,df.gender).where('age >= 20')
df_result = df.groupby('gender').avg('age')
# 数据输出
# outputMode 输出模式
# append    输出最新的数据           支持 select,where
# complete  输出所有聚合计算后的数据   支持 groupby聚合计算
# update    输出最新的数据包括聚合计算 支持 groupby聚合计算
df_result.writeStream.start(format='console',outputMode='update').awaitTermination()

运行结果:
在这里插入图片描述
总结:三种模式的使用场景

  • append 适合没有聚合操作的计算结果输出,将数据输出到文件需要用的是append。
  • complete 适合进行聚合操作,并且显示所有的数据计算结果。
  • update 适合进行聚合操作,只展示新增数据的结果。

4.3.2 指定输出位置

  • File Sink 把结果输出到文件中,仅支持追加 append模式
  • Kafka Sink 把结果输出到kafka的topic中,append complete update都支持。
  • Foreach Sink ForeachBatch Sink ,可以接收函数/对象,其中可以定义复杂的计算逻辑,对数据进行处理。数据最终输出到哪里,自己决定,append complete update都支持。
  • console Sink,直接在终端中显示,append complete update都支持。
  • Memory Sink,把数据输出到内存中,以表的形式存在,可以使用SparkSQL进行查询,支持append complete模式。
    输出到文件
  • 格式
writeStream.start(path='输出文件路径',format='输出源',outputMode='输出模式',checkpointLocation='检查点路径')
  • 使用

from pyspark.sql import SparkSession,functions as F

ss=SparkSession.builder.getOrCreate()

opt = {
    # 指定读取的socket服务的ip地址
    'host': '192.168.88.100',
    # 指定读取的端口
    'port': 9999
}
#使用soc
df = ss.readStream.load(format='socket',**opt)

#数据计算
df_res = df.select(
    F.split('value',',')[0].alias('id'),
    F.split('value',',')[1].alias('name'),
    F.split('value', ',')[2].cast('int').alias('age'),
    F.split('value', ',')[3].alias('gender')
).where('age >30')

# 将数据输出到文件中
option ={
    'checkpointLocation':'hdfs://node1:8020/stream_checkpoint'
}
df_res.writeStream.start(format='csv',outputMode='append',path='hdfs://node1:8020/csv_data',**option).awaitTermination()

运行结果:
在这里插入图片描述
注意

  • 写文件需要指定checkpointLocation
  • 不支持聚合数据写入文件
    ForeachBatch:
    需要自定义方法,完成对数据的计算,然后按照需求写入对应位置。本质是将df输出的数据传递到一个方法中。
  • 格式
# 函数
def foreach_batch_function(df,df_id):
# Transform and write batchDF    
pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  
  • 使用
# 流数据输出到自定义函数中,将无界表就会转为有界表数据
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据
df = ss.readStream.csv('hdfs://node1:8020/data_csv', header=False, sep=',',schema='id int,name string,gender string,age int,cls string')

# 数据计算
df_result = df.groupby('gender').avg('age')


# 数据输出
# 自定义函数
def func(df,batch_id):
    # 必须接受两个数据
    # df数据就是接受无界表的数据,将无界表转为了有界表的df
    # batch_id 处理数据的批次编号
    df.show()
    print(batch_id)
    # 将有界表数据写入文件
    df.write.csv('hdfs://node1:8020/foreach_data_csv',header=True,sep=',',mode='overwrite')


# 在输出时执行foreachBatch方法
df_result.writeStream.foreachBatch(func).start(outputMode='complete').awaitTermination()

运行结果:

4.3.3 设置触发器Trigger

触发器决定多久执行一次,在流式处理中,等一会儿(等多久)就是由触发器决定。

  • 格式
writeStream.trigger(processingTime=None, once=None, continuous=None).start().awaitTermination()
  • 默认触发器

    • 没有设置触发器,执行完上一个批次,立即执行下一个批次,下一批次没有数据则等待产生新数据, 请求时间间隔的长短由spark对数据计算的时间决定。
      • spark是基于内存计算的,所有计算时间会很多,官方的文档中 时间最短能达到1ms。
    • 固定时间间隔 processingTime=‘5 seconds’
      • 如果前一个微批处理在间隔内完成,则引擎将等待间隔结束,然后再启动下一个微批处理。
      • 如果前一个微批处理的完成时间比间隔时间长(即如果错过了一个间隔边界),那么下一个微批处理将在前一个完成后立即开始(即它不会等待下一个间隔边界)。
      • 如果没有新数据可用,则不会启动微批处理。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据
options = {
    'host':'192.168.88.100',
    'port':9999
}
df = ss.readStream.load(format='socket',**options)


# 数据计算
df_result = df.select(df.value)


# 结果输出
df.writeStream.trigger(processingTime='5 seconds').start(format='console',outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述

  • 一次性微批处理 once=True 默认是false。

    • 只会执行一次,适用于初始化 关闭资源这种只执行一次的场景。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据
options = {
    'host':'192.168.88.100',
    'port':9999
}
df = ss.readStream.load(format='socket',**options)


# 数据计算
df_result = df.select(df.value)


# 结果输出
df.writeStream.trigger(once=True).start(format='console',outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述

  • 以固定时间间隔连续处理 continuous=‘5 seconds’
    • 对固定间隔进行优化, 从而减低了延迟性的问题。
    • 和固定间隔微批处理行为上很像,无论是否有数据,到指定间隔时间后都会触发。
# 获取数据时的请求时间间隔设置
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()

# 读取数据
options = {
    'host':'192.168.88.100',
    'port':9999
}
df = ss.readStream.load(format='socket',**options)


# 数据计算
df_result = df.select(df.value)


# 结果输出
# trigger的continuous不支持文件数据源
df.writeStream.trigger(continuous='5 seconds').start(format='console',outputMode='append').awaitTermination()

运行结果:
在这里插入图片描述

4.3.4 checkpoint检查目录设置

当获取到数据进行计算时,有可能数据在计算时,会计算失败。此时spark会重新进行计算,此时就需要知道要计算哪个数据,就需要借助checkpoint机制,将当前计算的信息保存起立,方便重新进行计算。保证spark处理数据的容错

1、偏移量目录【offsets】:记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据,在处理数据前将其写入此日志记录。此日志中的第 N 条记录表示当前正在已处理,第 N-1 个条目指示哪些偏移已处理完成。
2、提交记录目录【commits】:记录已完成的批次,重启任务检查完成的批次与 offsets 批次记录比对,确定接下来运行的批次;
3、元数据文件【metadata】:metadata 与整个查询关联的元数据,目前仅保留当前job id
4、记录状态目录【state】:当有状态操作时,如累加聚合、去重、最大最小等场景,这个目录会被用来记录这些状态数据,根据配置周期性地生成.snapshot文件用于记录状态。

#流计算的检查点设置
from pyspark.sql import SparkSession, functions as F

ss = SparkSession.builder.getOrCreate()

#流式读取数据,转为无界的DataFrame
opt = {
    # 指定读取的socket服务的ip地址
    'host': '192.168.88.100',
    # 指定读取的端口
    'port': 9999
}
df = ss.readStream.load(format='socket',**opt)

#流式数据计算
#SQL
df.createTempView('stu')
sql_str = """
    select gender,count(id),sum(age) from (
    select 
    split(value,',')[0] as id, 
    split(value,',')[1] as name, 
    cast(split(value,',')[2] as int) as age,
    split(value,',')[3] as gender
    from stu) t1
    group by gender

"""
df_res = ss.sql(sql_str)
#结果数据输出
option = {
    #指定checkpoint 每个计算任务都有独立的checkpoint位置
    'checkpointLocation':'hdfs://node1:8020/checkpoint_stream'
}
df_res.writeStream.start(format='console',outputMode='complete',**option).awaitTermination()

运行结果:
在这里插入图片描述


原文地址:https://blog.csdn.net/m0_70882914/article/details/142964165

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!