自学内容网 自学内容网

74、DataStreamAPI-1.19 使用文档

一、DataStream API

1、概述

1)Flink程序剖析
1.Flink程序组成
a)Flink程序基本组成
  • 获取一个执行环境(execution environment)
  • 加载/创建初始数据;
  • 指定数据相关的转换;
  • 指定计算结果的存储位置;
  • 触发程序执行。
b)创建执行环境
getExecutionEnvironment();

createLocalEnvironment();

createRemoteEnvironment(String host, int port, String... jarFiles);

通常,只使用 getExecutionEnvironment() 即可,该方法会根据上下文做正确的处理

  • 如果在 IDE 中执行程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在本地机器上执行程序。
  • 如果基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行程序。
c)指定数据源

执行环境提供了一些方法,支持使用各种方法从文件中读取数据:可以直接逐行读取数据,像读 CSV 文件一样,或使用任何第三方提供的 source,将一个文本文件作为一个行的序列来读取:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");
d)应用转换

指定 data sources 将生成一个 DataStream,可以在上面应用转换(transformation)来创建新的派生 DataStream,可以调用 DataStream 上具有转换功能的方法来应用转换,使用 map 的转换:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});
e)指定数据汇

通过创建 sink 把包含最终结果的 DataStream 写到外部系统。

writeAsText(String path);

print();
f)触发程序执行

需要调用 StreamExecutionEnvironmentexecute() 方法来触发程序执行,根据 ExecutionEnvironment 的类型,执行会在本地机器上触发,或将程序提交到某个集群上执行。

  • execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。
image-20240424112012151
  • 如果不想等待作业完成,可以通过调用 StreamExecutionEnvironmentexecuteAsync() 方法来触发作业异步执行,它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。
image-20240424112236087

如下是使用 executeAsync() 实现 execute() 语义的示例。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
g)注意

所有 Flink 程序都是延迟执行的

  • 当程序的 main 方法被执行时,数据加载和转换不会直接发生,而是每个算子都被创建并添加到 dataflow 形成的有向图中;
  • 当被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。
2.Data Sources

Source 用于程序获取数据,可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到程序中。

Flink 自带了许多预先实现的 source functions,也可以实现 SourceFunction 接口编写自定义的非并行 source,也可以实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

通过 StreamExecutionEnvironment 可以访问预定义的 stream source:

基于文件

  • readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法,它基于给定的 fileInputFormat 读取路径 path 上的文件,根据提供的 watchType 的不同,source 可以定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE),使用 pathFilter,可以过滤正在处理的文件。

    实现

    在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取

    每个子任务都由一个单独的实体实现,监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行,读取任务的并行度和作业的并行度相等。

    单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。

    Reader 是将实际获取数据的角色,每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

    重要提示

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理,这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容,reader 会继续读取数据,直到所有文件内容都读完,关闭 source 会导致在那之后不再有检查点,这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取

基于套接字

  • socketTextStream - 从套接字读取,元素可以由分隔符分隔。

基于集合

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流,集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流,class 参数指定迭代器返回元素的数据类型。
  • fromElements(T ...) - 从给定的对象序列中创建数据流,所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流,class参数指定迭代器返回元素的数据类型。
  • fromSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

自定义

  • addSource - 关联一个新的 source function,可以使用 addSource(new FlinkKafkaConsumer<>(...)) 从 Apache Kafka 获取数据,更多参考 DataStream Connectors。
3.DataStream Transformations

参考算子。

4.Data Sinks

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串,通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写成逗号分隔的文件,行和字段的分隔符是可配置的,每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值,可以提供一个前缀(msg)附加到输出,有助于区分不同的 print 调用,如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类,支持自定义 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function,Flink 捆绑了连接到其它系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

注意

DataStream 的 write*() 方法主要用于调试,不参与 Flink 的 checkpointing,通常这些函数具有至少有一次语义,刷新到目标系统的数据取决于 OutputFormat 的实现,并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中;此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 FileSink;通过 .addSink(...) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

5.执行参数

StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。

参数的说明可参考执行配置,这些参数特别适用于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds):设置自动发送 watermark 的时间间隔,可以使用 long getAutoWatermarkInterval() 获取当前配置值。
image-20240424142359030
a)容错

State & Checkpointing 描述了如何启用和配置 Flink 的 checkpointing 机制。

b)控制延迟

默认情况下,元素不会在网络上一一传输(这会导致不必要的网络传输),而是被缓冲。

缓冲区的大小(实际在机器之间传输)可以在 Flink 配置文件中设置,虽然此方法有利于优化吞吐量,但当输入流不够快时,它可能会导致延迟问题。

要控制吞吐量和延迟,可以调用执行环境(或单个算子)的 env.setBufferTimeout(timeoutMillis) 方法来设置缓冲区填满的最长等待时间,超过此时间后,即使缓冲区没有未满,也会被自动发送,超时时间的默认值为 100 毫秒。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大限度地提高吞吐量,设置 setBufferTimeout(-1) 来删除超时,这样缓冲区仅在已满时才会被刷新。

要最小化延迟,请将超时设置为接近 0 的值(例如 5 或 10 毫秒)应避免超时为 0 的缓冲区,因为它会导致严重的性能下降。

6.调试
a)本地执行环境

LocalStreamEnvironment 在创建它的同一个 JVM 进程中启动 Flink 系统,如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// 构建你的程序

env.execute();
b)集合 Data Sources

Flink 提供了由 Java 集合支持的特殊 data sources 以简化测试。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 从元素列表创建一个 DataStream
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// 从任何 Java 集合创建一个 DataStream
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// 从迭代器创建一个 DataStream
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意: 集合 data source 要求数据类型和迭代器实现 Serializable,且集合 data sources 不能并行执行(parallelism = 1)。

c)迭代器 Data Sink

Flink 还提供了一个 sink 来收集 DataStream 的结果。

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();

注意:在程序完成时或者CheckPoint触发时才会输出结果。

7.总结
1.DataStream 的 write*() 方法不参与 Flink 的 checkpointing,具有至少一次语义,可以通过 FileSink 或 .addSink(...) 方法调用的自定义实现,参与 Flink 的 checkpointing,具有精准一次语义,实现将流可靠地、精准一次地传输到文件系统中;

2.通过配置 BufferTimeout 控制缓冲区刷出的延迟,通过配置 ExecutionConfig 控制执行参数;

3.通过调用 env.executeAsync() 返回的 jobClient 实现对 Flink Job 的控制(可用于在 Java 中控制 Flink 任务);

4.readFile()方法,在底层 Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取;监控由单个非并行(并行度 = 1)任务实现,而读取由多并行任务执行,读取任务的并行度和作业的并行度相等;单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于watchType),找到要处理的文件,将它们划分为分片,并将这些分片分配给下游 reader;Reader 是将获取数据的角色,每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。
2)代码案例
image-20240425101702209

2、执行模式-已总结

1)概述
1.流处理和批处理概述

DataStream API 的流(STREAMING)执行模式,适用于需要连续增量处理,而且常驻线上的无边界作业。

DataStream API 的批(BATCH)执行模式,类似于 MapReduce 等批处理框架,适用于已知输入、不会连续运行的的有边界作业。

Flink 对流处理和批处理采取统一的处理方式,无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终 结果;在模式执行的作业可能会产生增量更新(类似于数据库中的插入(upsert)操作),而作业只在最后产生一个最终结果。

通过启用执行模式,Flink 可以对有边界作业进行额外的优化:可以使用不同的关联(join)/ 聚合(aggregation)策略、不同 shuffle 实现来提高任务调度和故障恢复的效率等。

2.批执行模式选择时机

执行模式只能用于 有边界 的作业/Flink 程序。

边界:是数据源的一个属性,表明是否在执行之前已经知道来自该数据源的所有输入,或者新数据是否会无限期地出现;如果作业的所有源都是有边界的,则它就是有边界的,否则就是无边界的。

执行模式,既可用于有边界任务,也可用于无边界任务。

执行模式,只用于有边界任务。

使用模式运行有边界作业

  • 使用有边界作业的运行结果去初始化作业状态,并将该状态在之后的无边界作业中使用;例如,通过模式运行一个有边界作业,获取一个 savepoint,然后在一个无边界作业上恢复这个 savepoint【将 savepoint 作为 执行作业的附加输出】;
  • 为无边界数据源写测试代码时,使用有边界数据源更自然。
3.配置批执行模式

执行模式可以通过 execution.runtime-mode 配置,可选值如下:

  • STREAMING: 经典 DataStream 执行模式(默认)
  • BATCH: 在 DataStream API 上进行批量式执行
  • AUTOMATIC: 让系统根据数据源的边界性来决定

可以通过 bin/flink run ... 的命令行参数配置,或者在创建/配置 StreamExecutionEnvironment 时写进程序。

案例:通过命令行配置执行模式

bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>

案例:在代码中配置执行模式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

注意不建议在程序中设置运行模式,而是在提交应用程序时使用命令行设置,保持应用程序代码的免配置可以让程序更加灵活,因为同一个应用程序可能在任何执行模式下执行。

4.执行行为
a)任务调度与网络 Shuffle

执行模式中,因为输入数据是有边界的,Flink可以使用更高效的数据结构和算法来进行任务调度和网络 shuffle。

案例:任务调度和网络传输的差异

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> source = env.fromElements(...);

source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");

map()flatMap()filter(),可以依靠算子链链在一起,直接将数据转发到下一个操作,不涉及网络 shuffle。

keyBy()rebalance() ,需要在不同的任务并行实例之间传输数据,涉及网络 shuffle。

对于上面的例子,Flink 会将操作分组为以下任务

  • 任务1: sourcemap1map2
  • 任务2: map3map4
  • 任务3: map5map6sink

在任务1到任务2、任务2到任务3之间各有一次网络 shuffle,作业的可视化表示如下

image-20240425103902289

i)流执行模式

在**执行模式下**,所有任务需要持续运行;Flink可以通过整个管道立即处理新的记录,以达到需要的连续和低延迟的流处理;同时分配给某个作业的 TaskManagers 需要有足够的资源来同时运行所有的任务。

网络 shuffle 是 流水线 式的,记录会立即发送给下游任务,在网络层上进行一些缓冲;当处理连续的数据流时,在任务(或任务管道)之间没有可以实体化的自然数据点(时间点),而在执行模式下,中间的结果可以被实体化。

ii)批执行模式

在**执行模式下**,作业的任务可以分阶段执行;因为输入是有边界的,因此 Flink 可以在进入下一个阶段之前完全处理管道中的一个阶段;在上面的例子中,工作会有三个阶段,对应着被 shuffle 界线分开的三个任务。

分阶段处理要求 Flink 将任务的中间结果实体化到非永久存储中,让下游任务在上游任务下线后再读取,将增加处理的延迟,但这允许 Flink 在故障发生时回溯到最新的可用结果,而不是重新启动整个任务;同时作业可以在更少的资源上执行

TaskManagers 将至少在下游任务开始消费前保留中间结果,在这之后,只要空间允许,中间结果就会被保留,以便任务失败回滚。

b)State Backends / State

模式下,Flink 使用 StateBackend 控制状态的存储方式和检查点的工作方式。

模式下,配置的 state backend 被忽略;对输入按 key 分组(使用排序),依次处理一个 key 的所有记录,以便在同一时间只保留一个 key 的状态,当进行到下一个 key 时,上一个 key 的状态将被丢弃。

c)处理顺序

执行和执行中,算子或用户自定义函数(UDF)处理记录的顺序可能不同。

模式下,对于用户自定义函数,数据一到达就被处理。

模式下,Flink 确保数据有序,排序可以是特定调度任务、网络 shuffle、上文提到的 state backend 等。

将常见输入类型分为三类

  • 广播输入(broadcast input): 从广播流输入【广播状态】;
  • 常规输入(regular input): 既不从广播输入或也不从 keyed 输入;
  • keyed 输入(keyed input): 从 KeyedStream 输入;

消费多种类型输入的函数或算子,处理顺序如下

  • 广播输入第一个处理
  • 常规输入第二个处理
  • keyed 输入最后处理

对于从多个常规或广播输入进行消费的函数 — 比如 CoProcessFunction — Flink 有权从任一输入以任意顺序处理数据。

对于从多个keyed输入进行消费的函数 — 比如 KeyedCoProcessFunction — Flink 先处理单一键中的所有记录再处理下一个。

d)事件时间/水印

模式下,存在数据乱序的可能,使用 Watermark(一个带有时间戳 T 的水印)标志再没有时间戳 t < T 的元素跟进。

模式下,输入的数据集是事先已知的,至少可以按照时间戳对元素进行排序,从而按照时间顺序进行处理,不存在数据乱序的可能。

综上:在模式下,只需要在输入的末尾有一个与每个键相关的 MAX_WATERMARK,如果输入流没有键,则在输入的末尾需要一个 MAX_WATERMARK;所有注册的定时器都会在 时间结束 时触发,用户定义的 WatermarkAssignersWatermarkStrategies 会被忽略;但配置 WatermarkStrategy 是有用的,因为它的 TimestampAssigner 仍然会被用来给记录分配时间戳。

e)处理时间

处理时间是指在处理记录的具体实例上,处理记录的机器上的时钟时间,基于处理时间的计算结果是不可重复的,因为同一条记录被处理两次,会有两个不同的时间戳。

模式下,事件时间和处理时间存在相关性;在模式下事件时间的1小时也几乎是处理时间的1小时,使用处理时间可以用于早期(不完全)触发,给出预期结果的提示。

模式下,事件时间和处理时间相关性不存在,因为输入的数据集是静态的;允许用户请求当前的处理时间,并注册处理时间计时器,但与事件时间的情况一样,所有的计时器都要在输入结束时触发。

在作业执行过程中,处理时间不会提前,当整个输入处理完毕后,会快进到时间结束。

f)故障恢复

模式下,Flink 使用 checkpoint 进行故障恢复,在发生故障时,Flink 会从 checkpoint 重新启动所有正在运行的任务。

模式下,Flink 会尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动,相比从 checkpoint 重新启动所有任务,可以提高作业的处理效率和整体处理时间。

5.重要的考虑因素
a)概述

模式下的行为变化

  • “滚动"操作,如 reduce() 或 sum(),会对模式下每一条新记录发出增量更新,在模式下,只发出最终结果。

模式下不支持

  • Checkpointing 和任何依赖于 checkpointing 的操作都不支持。
b)Checkpointing

批处理程序的故障恢复不使用检查点,因为没有 checkpoints,某些功能如 CheckpointListener ,以及因此,Kafka 的精确一次(EXACTLY_ONCE) 模式或 File Sink 的 OnCheckpointRollingPolicy 将无法工作。

仍然可以使用所有的状态原语(state primitives),只是用于故障恢复的机制会有所不同。

c)编写自定义算子

注意模式下运行良好的 Operator 可能会在模式下产生错误的结果。

  • 模式下,会逐个 key 处理记录;因此,Watermark 会在每个 key 之间从 MAX_VALUE 切换到 MIN_VALUE
  • Watermark 在一个算子中不总是递增的。
  • 定时器将首先按 key 的顺序触发,然后按每个 key 内的时间戳顺序触发。
  • 不支持手动更改 key 的操作。
6.总结
1、即使不配置执行模式,在有界输入上执行的DataStream应用会产生相同的最终结果,但是配置批处理模式执行有界作业后,可以执行额外的优化。

2、可以通过代码【不建议】和命令行【建议】配置运行模式。

3、相同的算子在批模式下和流模式下可能会产生不同的结果(水位线的触发=>定时器的触发、窗口的触发)。

4、批执行模式和流执行模式均可以配置重试策略(RestartStrategies)来重启任务,但只有流执行模式支持检查点(Checkpoint)。

5、流执行模式,数据一到达就被处理;批执行模式,数据处理顺序为【广播输入》常规输入》keyed 输入】。
2)批模式和流模式对比表
类别 流模式 批模式
任务调度 所有任务需要持续运行,消耗资源大 任务可以按Shuffle分阶段执行,消耗资源小
Shuffle 记录会被流水线式的持续发送到下游任务,在网络上进行缓冲 可以保存Shuffle分阶段执行的中间结果
State Backends & State 使用StateBackend控制状态的存储方式和检查点的工作方式 对输入按key分组排序,依次处理一个key的所有记录,在同一时间只保留一个key的状态,当处理到下一个key时,上一个key的状态将被丢弃
处理顺序 对于用户自定义函数,数据一到达就被处理 广播输入》常规输入》keyed 输入
事件时间 存在数据乱序的可能 不存在数据乱序的可能,可以按时间排序
水位线 使用Watermark(一个带有时间戳 T 的水印)标志再没有时间戳 t < T 的元素出现 在当前key的末尾有一个 MAX_WATERMARK,仅在每个key末尾触发定时器,在下一个key的开始有一个MIN_WATERMARK
处理时间 事件时间和处理时间存在相关性 事件时间和处理时间相关性不存在,允许用户请求当前的处理时间,并注册处理时间计时器,仅在每个key末尾触发计时器
故障恢复 使用 checkpoint 进行故障恢复,发生故障时,从 checkpoint 重新启动所有正在运行的任务 尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动
3)代码案例
image-20240425163122504

3、事件时间-已总结

1)生成 Watermark
1.Watermark 策略简介

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,即数据流中的每个元素都需要拥有可分配的事件时间戳

通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,表明 Flink 应用程序事件时间的进度,可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssignerWatermarkGeneratorWatermarkStrategy

WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,用户也可以自定义 watermark 策略。

WatermarkStrategy 接口如下

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根据策略实例化一个 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

可以使用 WatermarkStrategy 工具类中通用的 watermark 策略,或者使用这个工具类将自定义的 TimestampAssignerWatermarkGenerator 进行绑定。

例如,使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,实现如下:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);

其中 TimestampAssigner 的设置是可选的,大多数情况下,可以不用去特别指定。

例如,当使用 Kafka 或 Kinesis 数据源时,可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。

注意: 时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位

2.使用 Watermark 策略

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是在数据源上使用,第二种是在非数据源的操作之后使用

第一种方式更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息,数据源可以更精准地跟踪 watermark,整体 watermark 生成将更精确;直接在源上指定 WatermarkStrategy 意味着必须使用特定数据源接口,例如KafkaSource。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("my-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

注意:使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark

3.处理空闲数据源

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark,称这类数据源为空闲输入空闲源

此时,当其它分区仍然发送事件数据时就会出现问题,由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

此时,可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态,WatermarkStrategy 为此提供了一个工具接口:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));
4.Watermark 对齐

某些 splits/partitions/shards 或 source 可能会非常快地处理记录,从而使其 Watermark 的增加速度相对较快,对于使用 Watermark 处理数据的下游 Operator 来说,下游 Operator(如聚合上的窗口联接)的水印可以正常进行,但是 Operator 需要缓冲来自快速输入的过多数据量,因为来自其所有输入的最小水印被滞后。

因此,由快速输入发出的所有记录都必须在下游 Operator 的状态中进行缓冲,这可能导致 Operator 状态的不可控增长。

此时,可以启用 Watermark 对齐,确保没有 splits/partitions/shards 或 source 将其 Watermark 增加得比其它源多太多,可以分别为每个源启用对齐

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

注意:只有 FLIP-27 的 source 可以启用水印对齐,它不适用于历史版本,不适用于在数据源之后应用#assignTimestampsAndWatermarks。

当启用Watermark 对齐时,需要告诉Flink,source 应属于哪个组,通过提供一个标签(例如alignment-group-1)来实现,该标签将共享它的所有source绑定在一起。

此外,必须告诉属于该组的所有source的当前最小水印的最大漂移,第三个参数描述了当前最大水印应该多久更新一次,频繁更新的缺点是在TM和JM之间会有更多的RPC消息传输。

为了实现对齐,Flink将暂停从源/任务进行消费,它将继续读取其他来源/任务的记录,这些来源/任务可以向前移动组合水印,从而解锁更快的水印。

注意:从Flink 1.17开始,FLIP-27源框架支持拆分级别的水印对齐,源连接器实现一个接口来恢复和暂停拆分,以便在同一任务中对齐splits/partitions/shards。

如果从1.15.x和1.16.x(含1.15.x)之间的Flink版本升级,通过将pipeline.watermark-alignment.allow-unaligned-source-splits设置为true来禁用拆分级别对齐;还可以通过检查源代码是否在运行时抛出UnsupportedOperationException或读取javadocs来判断它是否支持拆分级别的对齐,此时最好禁用拆分级别的水印对齐,以避免致命的异常。

当将标志设置为true时,只有当splits/partitions/shards的数量等于源运算符的并行度时,水印对齐才能正常工作,这导致每个子任务都被分配一个工作单元;另一方面,如果有两个Kafka分区,它们以不同的速度生成水印,并被分配给同一个任务,那么水印可能不会像预期的那样工作;但即使在最坏的情况下,基本对齐的性能也不会比根本没有对齐差。

此外,Flink还支持在相同来源和不同来源的任务之间进行对齐,当有两个不同的来源(例如Kafka和File)以不同的速度生成水印时,这很有用。

5.自定义 WatermarkGenerator
a)概述

WatermarkGenerator 接口代码如下:

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * 注意:WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,也可以基于事件数据本身生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的调用,也许会生成新的 watermark,也许不会。
     *
     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本质上有两种周期性生成标记生成

周期性生成器通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。

标记生成器将查看 onEvent() 中的事件数据,并检查在流中携带 watermark 的特殊标记事件或打点数据,当获取到这些事件数据时,它将立即发出 watermark,通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。

b)自定义周期性 Watermark 生成器

周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。

生成 watermark 的时间间隔(每 n 毫秒)可以通过 ExecutionConfig.setAutoWatermarkInterval(...) 指定;每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。

如下是两个使用周期性 watermark 生成器的示例;注意:Flink 已经附带了 BoundedOutOfOrdernessWatermarks,它实现了 WatermarkGenerator,其工作原理与下面的 BoundedOutOfOrdernessGenerator 相似。

/**
 * 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。
 * 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 秒

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 秒

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 处理时间场景下不需要实现
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}
c)自定义标记 Watermark 生成器

标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。

如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // onEvent 中已经实现
    }
}

注意: 可以针对每个事件生成 watermark,但每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。

6.Watermark 策略与 Kafka 连接器

当使用 Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。

当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。

此时,可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制;将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用单调递增时间戳分配器,按分区生成的 watermark 将生成完美的全局 watermark。

注意:在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。

案例:使用单 kafka 分区 watermark 生成机制,以及此时 watermark 如何通过 dataflow 传播。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("my-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
image-20240426142234771
7.算子处理 Watermark 的方式

在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理;例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游;即由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出

相同的规则也适用于 TwoInputStreamOperator;此时,算子当前的 watermark 会取其两个输入的最小值。

8.弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks

在 Flink 新的 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象接口之前,Flink 使用的是 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks

建议使用新接口,因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式。

9.总结
1.WatermarkStrategy 包含 TimestampAssigner 和 WatermarkGenerator;

2.使用WatermarkStrategy可以在数据源上(建议)也可以在非数据源的操作之后(不建议),但要注意数据源是否支持;

3.处理空闲数据源withIdleness(Duration.ofMinutes(1));

4.处理水位线对齐withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

5.水位线生成方式分为周期性和标记形式;
2)内置 Watermark 生成器
1.单调递增时间戳分配器

周期性 watermark 生成方式最简单的特例就是给定的数据源中数据的时间戳升序出现;此时,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个单分区数据源任务时间戳递增;例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可;Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark。

WatermarkStrategy.forMonotonousTimestamps();
2.数据之间存在最大固定延迟的时间戳分配器

当 watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量;该示例覆盖的场景是预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源产生的数据的时间戳在一个固定范围之内。

Flink 针对上述场景提供了 boundedOutfordernessWatermarks 生成器,该生成器将 maxOutOfOrderness 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间

其中延迟时长就等于 t - t_w ,其中 t 代表元素的(事件时间)时间戳,t_w 代表前一个 watermark 对应的(事件时间)时间戳;如果 lateness > 0,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

4、用户自定义 Functions-已总结

1)用户自定义函数-已总结
1.实现接口

最基本的方法是实现提供的接口。

# 根据提供的接口创建自定义函数
class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { 
    return Integer.parseInt(value); 
  }
}

# 调用创建的自定义函数
data.map(new MyMapFunction());
2.匿名类

可以将 function 当做匿名类传递。

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { 
     return Integer.parseInt(value); 
  }
});
3.Java 8 Lambdas

Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

data.filter(s -> s.startsWith("http://"));

data.reduce((i1,i2) -> i1 + i2);
4.Rich functions

所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { 
    return Integer.parseInt(value); 
  }
}

替换成

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { 
    return Integer.parseInt(value); 
  }
}

并将 function 照常传递给 map transformation。

data.map(new MyMapFunction());

Rich functions 也可以定义成匿名类:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { 
    return Integer.parseInt(value); 
  }
});
2)累加器 和 计数器-已总结
1.概述

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用

最简单的累加器是计数器: 可以使用 Accumulator.add(V value) 方法将其递增。

在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端,Flink 目前有如下内置累加器,每个都实现了累加器接口。

  • IntCounter , LongCounterDoubleCounter
  • Histogram(直方图): 离散数量的柱状直方图实现;在内部,它只是整形到整形的映射,可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况【详见 Metrics】。
2.使用累加器

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,必须在 rich function 的 open() 方法中注册累加器对象,也可以在此处定义累加器的名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用;Flink 计划在下一次迭代中提供上一次的迭代结果;可以使用 聚合器 来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3.定制累加器

自定义累加器只需要实现累加器接口,可以选择实现 Accumulator 或 SimpleAccumulator。

Accumulator 的实现十分灵活: 它定义了将要添加的值类型 V,并定义了最终的结果类型 R;例如,对于直方图,V 是一个数字且 R 是一个直方图。

SimpleAccumulator 适用于两种类型都相同的情况,例如计数器。

4.总结
1.通过调用 execute() 方法返回的 JobExecutionResult 对象获得累加器结果(只有等待作业完成后执行才起作用)。

2.单个作业的所有累加器共享一个命名空间,可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

5、状态与容错-已总结

1)使用状态-已总结
1.Keyed DataStream

使用 keyed state,首先需要为DataStream指定 key(主键);这个 key 用于状态分区(数据流中的 Record 也会被分区)可以使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key,将生成 KeyedStream

Key selector 函数接收单条 Record 作为输入,返回这条记录的 key,该 key 可以为任何类型,但是它的计算产生方式必须具备确定性,Flink 的数据模型不基于 key-value 对,将数据集在物理上封装成 key 和 value 是没有必要的,Key 是“虚拟”的,用以操纵分组算子。

案例: key selector 函数。

// some ordinary POJO
public class WC {
  public String word;
  public int count;

  public String getWord() { 
    return word; 
  }
}

DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(WC::getWord);

Flink 有两种不同定义 key 的方式

可以通过 tuple 字段索引,或者选取对象字段的表达式来指定 key 即 Tuple Keys 和 Expression Keys。

2.使用 Keyed State
a)概述

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下,即这些状态仅可在 KeyedStream 上使用,在Java/Scala API上可以通过 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通过 stream.key_by(...) 得到 KeyedStream

支持的状态类型如下

  • ValueState: 保存一个可以更新和检索的值(每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState: 保存一个元素的列表,可以往这个列表中追加数据,并在当前的列表上进行检索,通过 add(T) 或者 addAll(List) 添加元素,通过 Iterable get() 获得整个列表,还可以通过 update(List) 覆盖当前的列表。
  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合,使用 add(T) 增加的元素会用提供的 ReduceFunction 进行聚合。
  • AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合,和 ReducingState 相反的是,聚合类型可能与添加到状态的元素的类型不同,使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
  • MapState: 维护了一个映射列表,可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器,使用 put(UK,UV) 或者 putAll(Map) 添加映射,使用 get(UK) 检索特定 key,使用 entries()keys()values() 分别检索映射、键和值的可迭代视图,还可以通过 isEmpty() 来判断是否包含任何键值对。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置;从状态中获取的值取决于输入元素所代表的 key,在不同 key 上调用同一个接口,可能得到不同的值。

必须创建一个 StateDescriptor,才能得到对应的状态句柄,它保存了状态名称, 状态所持有值的类型,可能包含用户指定的函数,例如ReduceFunction,根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorAggregatingStateDescriptor, ReducingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,只能在 rich functions 中使用,RichFunctionRuntimeContext 提供如下方法

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

案例FlatMapFunction 使用状态

实现了计数窗口,把元组的第一个元素当作 key,该函数将出现的次数以及总和存储在 “ValueState” 中,一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。

注意:会为每个不同的 key(元组中第一个元素)保存一个单独的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
b)状态有效期 (TTL)
参数 配置 配置2 配置3
状态的可见性 NeverReturnExpired 不返回过期数据 ReturnExpiredIfNotCleanedUp 会返回过期但未清理的数据
TTL 更新策略 OnCreateAndWrite 仅在创建和写入时更新 OnReadAndWrite 读取和写入时更新
状态清理策略 cleanupFullSnapshot 全量快照时进行清理 cleanupIncrementally 增量数据清理 cleanupInRocksdbCompactFilter RocksDB 压缩过滤器

任何类型的 keyed state 都可以有 有效期 (TTL),如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,所有状态类型都支持单元素的 TTL,列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象,然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.

原文地址:https://blog.csdn.net/m0_50186249/article/details/140514800

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!