Flink 之 Window 机制详解(下):应用示例与窗口函数
《Flink 之 Window 机制详解(下):应用示例与窗口函数》
一、引言
在前面的文章中,我们已经详细阐述了 Flink 中 Window 机制的基础概念、分类以及其内部的基本工作原理,这些知识为我们深入理解 Window 机制奠定了坚实的基础。在本文中,我们将进一步聚焦于 Window 机制在实际应用中的具体示例,以及深入探讨窗口函数(Windows Function)这一核心组件的丰富内涵与强大功能。通过实际案例与代码示例的结合,我们将更加直观地感受 Window 机制在处理流数据时的卓越表现,以及如何巧妙地运用窗口函数来满足各种复杂多样的业务需求,从而真正掌握这一强大的数据处理工具。
二、Window 机制的应用示例
-
滚动窗口演示:
- 考虑这样一个实际的交通流量监测场景,我们的需求是每 5 秒钟统计一次,最近 5 秒钟内,各个路口通过红绿灯汽车的数量(基于时间的滚动窗口)。在实际操作中,为了更好地观察和理解数据的处理过程,建议将时间间隔修改为 1 分钟。这样,当数据源源不断地流入系统时,例如在 48 分 03 秒时某个路口的信号灯数据被采集并输入到 Flink 系统中,系统会根据滚动窗口的机制,将这些数据纳入到对应的 48 分 - 49 分这个时间窗口内进行统计。由于 Flink 底层采用了特定的算法来确保窗口计算的准确性和高效性,在 49 分整点时,我们就能够准确地获取到这 1 分钟内该路口通过汽车的数量统计结果。这一过程清晰地展示了滚动窗口如何按照固定的时间周期,对数据进行精确的划分和聚合计算,为我们实时了解交通流量状况提供了有力的支持。
-
滑动窗口演示:
- 同样是针对交通路口汽车数量的统计需求,现在我们将其改为每 5 秒钟统计一次,最近 5 秒钟内的情况,但这次采用滑动窗口机制。在这个场景中,由于滑动窗口的特性,数据会被多个窗口所覆盖,这就导致其效果不像滚动窗口那样直观明了。例如,当数据不断流入时,一个数据元素可能会同时被纳入到多个相邻的滑动窗口中进行计算。假设在某一时刻,我们接收到了一系列汽车通过的数据,这些数据会根据滑动窗口的设置,被分配到不同的窗口组合中进行统计。由于窗口之间存在重叠部分,同一个数据可能会在不同的时间点被多次计算,这就需要我们通过手动计算或者借助一些辅助工具来对比和分析不同窗口计算结果之间的差异,从而深入理解滑动窗口在捕捉数据变化趋势方面的独特作用。
此外,以热门话题在社交媒体上的传播热度统计(热词统计)为例,我们可以构建一个数据处理流程,向 kafka 随机发送 50000 个热词(以 200 毫秒的时间间隔发送),然后利用 Flink 的 Window 机制,分别根据滚动窗口和滑动窗口来统计热词的出现频率等相关结果。在编写 Flink 代码实现这一功能时,我们将重点体会 apply 方法的使用方式以及深入感受滚动窗口和滑动窗口在实际数据处理过程中的不同效果。这里需要特别注意的是,apply 和 process 这两个方法都属于处理全量计算的范畴,但在实际工作场景中,process 方法更为常用。这是因为 process 方法更加底层、功能更加强大,它提供了 open/close 等生命周期方法,使得我们能够在窗口的不同生命周期阶段进行自定义的操作,例如在窗口打开时进行一些初始化工作,在窗口关闭时进行资源清理或最终结果的整合处理。同时,process 方法还能够获取 RuntimeContext,这为我们在处理过程中获取更多的运行时信息和上下文环境提供了便利,从而能够更加灵活地应对各种复杂的业务逻辑需求。
三、打印数据的方式
在大数据处理的生态系统中,不同的框架往往采用不同的方式来打印数据,以便开发者能够直观地查看数据处理的结果和中间状态。在 spark 框架中,如果我们想要打印 rdd 的内容,可以使用 foreach(lambda x: print(x)) 这样的语法结构,它会遍历 rdd 中的每个元素并将其打印输出。而对于 spark 的 dataFrame,我们则可以使用 show() 方法来以表格形式展示 dataFrame 的数据内容,这种方式能够清晰地呈现数据的结构和各个字段的值。在 Flink 框架中,当我们想要打印 dataStream 中的数据时,使用 print() 方法即可。这一简单而便捷的方法能够将 dataStream 中的数据实时地打印到控制台,方便我们在开发和调试过程中及时观察数据的流动和处理情况。例如,在我们进行上述热词统计的 Flink 代码中,可以在适当的位置插入 print() 方法来查看每个窗口计算后的热词统计结果,从而验证我们的代码逻辑是否正确以及窗口机制是否按照预期工作。
四、窗口函数(Windows Function)
-
窗口函数的分类:
- 全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。这种计算方式的优势在于它能够获取到窗口内完整的数据信息,从而可以进行一些复杂的计算操作,如对数据进行排序、筛选出特定条件的数据后再进行聚合计算等。例如,在统计一段时间内用户的订单金额时,我们可能需要先对订单按照金额大小进行排序,然后再计算总金额或者平均金额等统计指标,这时候全量函数就能够满足我们的需求。它能够确保在计算之前所有的数据都已经准备就绪,避免了因数据不全而导致的计算错误或不准确的情况。
- 增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。这种方式的特点是计算效率相对较高,因为它不需要等待所有数据都到达后再进行计算,而是在数据流入的过程中逐步更新中间结果。例如,在计算实时数据的累加和时,增量函数可以在每个新数据到来时,直接将其加到已有的累加和中间结果上,从而快速得到最新的累加和。这种方式适用于一些对实时性要求较高且计算逻辑相对简单的场景,能够在不占用过多内存和计算资源的情况下,及时提供数据的初步统计结果。
-
增量聚合函数:
- 指窗口每进入一条数据就计算一次。其中 AggregateFunction 比 ReduceFunction 更加通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。它具有 add 方法可以将一个输入元素添加到一个累加器中,还有创建初始累加器(createAccumulator 方法)、将两个累加器合并到一个累加器(merge 方法)以及从累加器中提取输出(类型为 OUT)的方法。例如,在统计一个数据流中数字的总和时,输入类型可以是 Integer,累加器类型可以是一个包含当前总和的对象,输出类型则为最终的总和结果类型。当一个新的数字数据流入窗口时,add 方法会被调用,将该数字添加到累加器的总和中。如果在分布式环境下有多个分区或子任务在处理数据,merge 方法就会发挥作用,将不同分区的累加器结果合并成一个最终的累加器结果,最后通过从累加器中提取输出的方法得到最终的总和统计值。
-
全量聚合函数:
- 指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算)。
下面通过一个具体的 Java 代码示例来进一步说明全量聚合函数的使用。
五、Java 代码示例:使用全量窗口函数 apply 计算平均成绩
package com.bigdata.day04;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
/**
* @基本功能: 使用全量窗口函数 apply ,计算平均成绩
* @program:FlinkDemo
* @author: 闫哥
* @create:2024-05-15 15:05:24
**/
public class Demo03 {
public static void main(String[] args) throws Exception {
//1. env - 准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. source - 加载数据
Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("class1", "张三", 100L),
Tuple3.of("class1", "李四", 40L),
Tuple3.of("class1", "王五", 60L),
Tuple3.of("class2", "赵六", 20L),
Tuple3.of("class2", "小七", 30L),
Tuple3.of("class2", "小八", 50L)
};
// 先求每个班级的总分数,再求每个班级的总人数
DataStreamSource<Tuple3<String, String, Long>> streamSource = env.fromElements(ENGLISH);
KeyedStream<Tuple3<String, String, Long>, String> keyedStream = streamSource.keyBy(v -> v.f0);
// 每个分区中的数据都达到了 3 条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算
// //Tuple3<String, String, Long> 输入类型
// //Tuple2<Long, Long> 累加器 ACC 类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数
// //Double 输出类型
// 第一个泛型是输入数据的类型,第二个泛型是返回值类型 第三个是 key 的类型, 第四个是窗口对象
keyedStream.countWindow(3).apply(new WindowFunction<Tuple3<String, String, Long>, Double, String, GlobalWindow>() {
@Override
public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, Long>> input, Collector<Double> out) throws Exception {
// 计算总成绩,计算总人数
int sumScore = 0, sumPerson = 0;
for (Tuple3<String, String, Long> tuple3 : input) {
sumScore += tuple3.f2;
sumPerson += 1;
}
out.collect((double) sumScore / sumPerson);
}
}).print();
//5. execute - 执行
env.execute();
}
}
在上述代码中,我们首先通过 StreamExecutionEnvironment.getExecutionEnvironment()
获取 Flink 的执行环境,并设置运行模式为自动(RuntimeExecutionMode.AUTOMATIC
)。然后,我们创建了一个包含学生成绩信息的数据集,其中每个元素是一个 Tuple3
,包含班级、学生姓名和成绩。通过 env.fromElements(ENGLISH)
将数据集加载到 DataStreamSource
中,并使用 keyBy
操作按照班级进行分组,得到 KeyedStream
。接着,我们使用 countWindow(3)
设置了一个计数窗口,当每个班级的分区数据达到 3 条时,就会触发窗口计算。在 apply
方法中,我们定义了全量聚合函数的逻辑,遍历窗口内的所有数据,计算总成绩和总人数,最后计算平均成绩并通过 Collector
收集输出。通过 print()
方法,我们可以将每个班级的平均成绩打印到控制台,以便查看结果。
通过以上对 Flink 中 Window 机制的应用示例以及窗口函数的介绍,我们对其在实际项目中的使用有了更深入的理解,能够更好地运用这一强大的机制来处理流数据中的各种聚合需求。无论是处理实时的交通流量数据、社交媒体热词统计还是其他各种复杂的业务场景,Window 机制都为我们提供了灵活而高效的解决方案,帮助我们从海量的流数据中挖掘出有价值的信息和洞察。
原文地址:https://blog.csdn.net/qq_68076599/article/details/144035929
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!