自学内容网 自学内容网

Flink四大基石之窗口(Window)使用详解

目录

一、引言

二、为什么需要 Window

三、Window 的控制属性

窗口的长度(大小)

窗口的间隔

四、Flink 窗口应用代码结构

是否分组

Keyed Window --键控窗

Non-Keyed Window

核心操作流程

五、Window 的生命周期

分配阶段

触发计算

六、Window 的分类

滚动窗口- TumblingWindow概念

滑动窗口– SlidingWindow概念

会话窗口 [了解]

七、Windows Function 窗口函数

分类剖析

增量聚合函数(以 AggregateFunction 为例)

全量聚合函数

八、案例实战

案例一

滚动窗口演示        

滑动窗口演示

热词统计案例

kafka发送消息的模板代码

九、总结


        本文深入探讨 Flink 中高级 API 里窗口(Window)的相关知识,涵盖为什么需要窗口、其控制属性、应用代码结构、生命周期、分类,以及窗口函数的各类细节,并辅以实例进行讲解,旨在助力开发者透彻理解并熟练运用 Flink 的窗口机制处理流数据。

一、引言

        在大数据实时处理领域,Apache Flink 凭借其卓越性能与丰富功能占据重要地位。而窗口(Window)作为 Flink 从流处理(Streaming)到批处理(Batch)的关键桥梁,理解与掌握其使用对高效数据处理意义非凡,接下来将全方位剖析其奥秘。

二、为什么需要 Window

        在流处理场景中,数据如潺潺溪流般持续涌入、无休无止。但诸多业务场景要求我们对特定时段数据做聚合操作,像统计 “过去的 1 分钟内有多少用户点击了我们的网页”。若不划定范围,面对无尽数据洪流,根本无法开展有针对性计算。窗口恰似神奇 “箩筐”,按规则收集一定时长或一定数据量数据,将无限流拆分成有限 “桶”,便于精准计算,满足如 “每隔 10min,计算最近 24h 的热搜词” 这类实时需求。

三、Window 的控制属性

窗口的长度(大小)

        明确要计算最近多久的数据,以时间维度举例,若关注 24 小时内热搜词数据量,那 24 小时即窗口长度;计数维度下,设定统计前 N 条数据,N 就是计数窗口的长度规格。

窗口的间隔

        决定隔多久进行一次计算操作。像 “每隔 10min,计算最近 24h 的热搜词” 里,每隔 10 分钟便是间隔设定,它把控着计算频次节奏。

四、Flink 窗口应用代码结构

是否分组

        首先要判定是否依 Key 对 DataStream 分组,经 keyBy 操作后,数据流成多组,下游算子多实例可并行跑,提效显著;若用 windowAll 则不分组,所有数据送下游单个实例(并行度为 1),后续窗口操作逻辑与分组情形(Keyed Window)类似,仅执行主体有别。

Keyed Window --键控窗

// Keyed Window
stream
        .keyBy(...)              <-  按照一个Key进行分组
        .window(...)            <-  将数据流中的元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream
        .windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process()      <-  窗口处理函数Window Function

核心操作流程

        借助窗口分配器(WindowAssigner)依时间(Event Time 或 Processing Time)把数据流元素 “分拣” 进对应窗口;待满足触发条件(常是窗口结束时间到等情况),用窗口处理函数(如 reduce、aggregate、process 等常用函数)处理窗口内数据,此外,trigger、evictor 是面向高级自定义需求的触发、销毁附加项,默认配置也能应对常见场景。

五、Window 的生命周期

分配阶段

        窗口分配器依据设定规则(像按时间间隔、计数规则等),为流入数据 “找家”,安置到合适窗口 “桶” 内,确定数据归属,构建基础计算单元。

触发计算

        当预设触发条件达成,如时间窗口到结束点,对应窗口函数 “登场”,对窗口内数据按既定逻辑聚合处理,不同窗口函数(reduce、aggregate、process)处理细节、能力有差异,像 process 更底层、功能更强大,自带 open/close 生命周期方法且能获取 RuntimeContext。

        上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。注意,本图只是一个大致示意图,不同的Window Function的处理方式略有不同。

 

        从数据类型上来看,一个DataStream经过keyBy转换成KeyedStream,再经过window转换成WindowedStream,我们要在之上进行reduce、aggregate或process等Window Function,对数据进行必要的聚合操作。

六、Window 的分类

Window可以分成两类:

CountWindow按指定数据条数生成窗口,与时间脱钩。

滚动计数窗口:每隔 N 条数据,聚焦统计前 N 条,如每来 10 条统计前 10 条信息。

滑动计数窗口:每隔 N 条数据,统计前 M 条(N≠M),像每过 20 条统计前 15 条情况。

TimeWindow(重点):基于时间划定窗口。

滚动时间窗口:每隔 N 时间,统计前 N 时间范围数据,如每隔 5 分钟统计前 5 分钟车辆通过量,窗口长度与滑动距离均为 5 分钟。

滑动时间窗口:每隔 N 时间,统计前 M 时间范围数据(M≠N),像每隔 30 秒统计前 1 分钟车辆数据,窗口长度 1 分钟、滑动距离 30 秒。

会话窗口:设会话超时时间(如 10 分钟),期间无数据来则结算上一窗口数据,按毫秒精细界定范围,与 Key 值关联紧密,Key 值无新输入达设定时长就统计,不受全局新数据流入干扰。

滚动窗口- TumblingWindow概念

流是连续的,无界的(有明确的开始,无明确的结束

假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量

对于这个问题,肯定是无法回答的,为何?

因为,统计是一种对固定数据进行计算的动作。

因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束)

那么,我们换个问题:统计1分钟内通过的汽车数量

那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。

描述完整就是:每隔1分钟,统计这1分钟内通过汽车的数量。窗口长度是1分钟,时间间隔是1分钟,所以这样的窗口就是滚动窗口。

那么,这个行为或者说这个统计的数据边界,就称之为窗口。

同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口

反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口

同时,这样的窗口被称之为滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口。

滑动窗口– SlidingWindow概念

同样是需求,改为:

每隔1分钟,统计前面2分钟内通过的车辆数

对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次,窗口长度和时间间隔不相等,并且是大于关系,就是滑动窗口

或者:每通过100辆车,统计前面通过的50辆车的品牌占比

对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次

对于这样的窗口,我们称之为滑动窗口。

那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据)

隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车)

那么可以看出,滑动窗口,就是滑动距离不等于窗口长度的一种窗口

比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等

比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等

那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。

对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口

那么,我们可以看出:

滚动窗口:窗口长度= 滑动距离

滑动窗口:窗口长度!= 滑动距离

总结:其中可以发现,对于滑动窗口:

滑动距离> 窗口长度,会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据)这样的东西,没人用。

滑动距离< 窗口长度,会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据)

滑动距离= 窗口长度,不漏也不会重复,也就是滚动窗口

窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】

窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】

窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】

会话窗口 [了解]

Session 会话,一次会话。就是谈话。

设置一个会话超时时间间隔即可, 如10分钟,那么表示:

如果10分钟没有数据到来, 就计算上一个窗口的数据

代码中,并行度设置为1,测试比较 方便。

窗口的范围:

窗口的判断是按照毫秒为单位

如果窗口长度是5秒

窗口的开始: start

窗口的结束: start + 窗口长度 -1 毫秒

比如窗口长度是5秒, 从0开始

那么窗口结束是: 0 + 5000 -1 = 4999

七、Windows Function 窗口函数

分类剖析

全量函数:耐心缓存窗口所有元素,直至触发条件成熟,才对全量数据 “开刀” 计算,此特性可满足数据排序等复杂需求。

增量函数:保存中间数据 “蓝本”,新元素流入就与之融合更新,持续迭代中间成果,高效且灵活。

增量聚合函数(以 AggregateFunction 为例)

        每有新数据 “入局”,立马按规则计算,其接口含输入类型(IN)、累加器类型(ACC)、输出类型(OUT)参数,有对应 add、createAccumulator、merge、extractOutput 等方法,构建严谨聚合流程。

实现方法(常见的增量聚合函数如下):
reduce(reduceFunction)
aggregate(aggregateFunction)
sum()
min()
max()

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
maxBy、minBy、sum这3个底层都是由reduce实现的
aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

AggregateFunction 【了解】

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.bigdata.windows;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class _04_AggDemo {

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L)
    };

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);

        KeyedStream<Tuple3<String,String,Long>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple3<String,String,Long>, String>() {

            @Override
            public String getKey(Tuple3<String,String,Long> tuple3) throws Exception {
                return tuple3.f0;
            }
        });

        //3. transformation-数据处理转换
        // 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)
        keyedStream.countWindow(3).aggregate(new AggregateFunction<Tuple3<String,String,Long>, Tuple3<String,Long,Integer>, Tuple2<String,Double>>() {

            // 初始化一个中间变量
            Tuple3<String,Long,Integer> tuple3 = Tuple3.of(null,0L,0);
            @Override
            public Tuple3<String,Long,Integer> createAccumulator() {
                return tuple3;
            }

            @Override
            public Tuple3<String,Long,Integer> add(Tuple3<String, String, Long> value, Tuple3<String,Long,Integer> accumulator) {
                long tempScore = value.f2 + accumulator.f1;
                int length = accumulator.f2 + 1;
                return Tuple3.of(value.f0, tempScore,length);
            }

            @Override
            public Tuple2<String, Double> getResult( Tuple3<String,Long,Integer> accumulator) {
                return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);
            }

            @Override
            public Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {
                return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);
            }


        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

全量聚合函数

        坚守等窗口数据集齐 “发令枪响” 才运算原则,确保计算基于完整数据集,保障结果准确性、完整性,契合多场景聚合诉求。

实现方法
apply(windowFunction)
process(processWindowFunction)

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。
ProcessWindowFunction一次性迭代整个窗口里的所有元素,比较重要的一个对象是Context,可以获取到事件和状态信息,这样我们就可以实现更加灵活的控制,该算子会浪费很多性能,主要原因是不增量计算,要缓存整个窗口然后再去处理,所以要设计好内存。

package com.bigdata.day04;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;


public class Demo03 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        Tuple3[] ENGLISH = new Tuple3[] {
                Tuple3.of("class1", "张三", 100L),
                Tuple3.of("class1", "李四", 40L),
                Tuple3.of("class1", "王五", 60L),
                Tuple3.of("class2", "赵六", 20L),
                Tuple3.of("class2", "小七", 30L),
                Tuple3.of("class2", "小八", 50L)
        };
        // 先求每个班级的总分数,再求每个班级的总人数
        DataStreamSource<Tuple3<String,String,Long>> streamSource = env.fromElements(ENGLISH);
        KeyedStream<Tuple3<String, String, Long>, String> keyedStream = streamSource.keyBy(v -> v.f0);
        // 每个分区中的数据都达到了3条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算
        // //Tuple3<String, String, Long> 输入类型
        //    //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数
        //    //Double 输出类型
        // 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象
        keyedStream.countWindow(3).apply(new WindowFunction<Tuple3<String, String, Long>, Double, String, GlobalWindow>() {
            @Override
            public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, Long>> input, Collector<Double> out) throws Exception {
                // 计算总成绩,计算总人数
                int sumScore = 0,sumPerson=0;
                for (Tuple3<String, String, Long> tuple3 : input) {
                    sumScore += tuple3.f2;
                    sumPerson += 1;
                }
                out.collect((double)sumScore/sumPerson);
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

八、案例实战

案例一

 需求为 “每 5 秒钟统计一次,最近 5 秒钟内,各个路口通过红绿灯汽车的数量”,借 Flink 代码实现,底层算法作用下,数据按节奏聚合统计,时间设 1 分钟更易观察效果,能清晰看到各时段车辆数统计产出。

nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

没有添加窗口的写法:

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Demo07 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 9,2 --> (9,2)
        //3. transformation-数据处理转换
        streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {

            @Override
            public Tuple2<Integer, Integer> map(String line) throws Exception {
                String[] arr = line.split(",");
                int monitor_id = Integer.valueOf(arr[0]);
                int num = Integer.valueOf(arr[1]);
                return Tuple2.of(monitor_id,num);
            }
        }).keyBy(tuple->tuple.f0).sum(1).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

此处的sum求和,中count ,其实是CartInfo中的一个字段而已。

演示:

滚动窗口演示
        

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


public class Demo08 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 9,2 --> (9,2)
        //3. transformation-数据处理转换
        streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {

            @Override
            public Tuple2<Integer, Integer> map(String line) throws Exception {
                String[] arr = line.split(",");
                int monitor_id = Integer.valueOf(arr[0]);
                int num = Integer.valueOf(arr[1]);
                return Tuple2.of(monitor_id,num);
            }
        }).keyBy(tuple->tuple.f0)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                        .sum(1).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

        以上代码的时间最好修改为1分钟,假如时间间隔是1分钟,那么48分03秒时输入的信号灯数据,49分整点会统计出来结果,原因是底层有一个算法。

        滑动窗口的话,不太容易看到效果,因为有些数据被算到了多个窗口中,需要我们拿笔自己计算一下,对比一下:

滑动窗口演示

        同样统计各路口汽车数量,但需求改为 “每 5 秒钟统计一次,最近 10 秒钟内”,因数据会在多窗口重复计算,需手动比对梳理,深入体会滑动窗口数据处理逻辑与特点。

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


public class Demo09 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 9,2 --> (9,2)
        //3. transformation-数据处理转换
        streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {

            @Override
            public Tuple2<Integer, Integer> map(String line) throws Exception {
                String[] arr = line.split(",");
                int monitor_id = Integer.valueOf(arr[0]);
                int num = Integer.valueOf(arr[1]);
                return Tuple2.of(monitor_id,num);
            }
        }).keyBy(tuple->tuple.f0)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                        .sum(1).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

热词统计案例

        借助 Kafka 随机发送 50000 个热词(200 毫秒间隔),分别基于滚动、滑动窗口统计,编写 Flink 代码时着重体会 apply 方法,兼顾二者效果差异,同时知晓工作中 process 函数因更强大底层能力常成首选。

apply和process都是处理全量计算,但工作中正常用process。

process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

package com.bigdata.day03.time;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class Demo10 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","bigdata01:9092");
        properties.setProperty("group.id","g2");

        //2. source-加载数据
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("flink-01",new SimpleStringSchema(),properties);
        DataStreamSource<String> kafkaSource = env.addSource(kafkaConsumer);
        //3. transformation-数据处理转换
        kafkaSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value,1);
            }
        }).keyBy(tuple->tuple.f0)
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                // 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象
                .apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
                    @Override
                    public void apply(String key,  // 代表分组key值     五旬老太守国门
                                      TimeWindow window, // 代表窗口对象
                                      Iterable<Tuple2<String, Integer>> input, // 分组过之后的数据 [1,1,1,1,1]
                                      Collector<String> out  // 用于输出的对象
                    ) throws Exception {
                        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long start = window.getStart();
                        long end = window.getEnd();
                        int sum = 0;
                        for (Tuple2<String, Integer> tuple2 : input) {
                            sum += tuple2.f1;
                        }

                        out.collect(key+",窗口开始:"+dateFormat.format(new Date(start))+",结束时间:"+dateFormat.format(new Date(end))+","+sum);
                        //out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);

                    }
                }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

kafka发送消息的模板代码

package com.bigdata.day03.time;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class CustomProducer {

    public static void main(String[] args) {

        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 创建了一个消息生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 调用这个里面的send方法
        String[] hotWords= new String[]{"郭有才","歌手2024","五旬老太守国门","师夷长技以制夷"};

        Random random = new Random();
        for (int i = 0; i < 50000; i++) {
            String word = hotWords[random.nextInt(4)];
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("flink-01",word);
            kafkaProducer.send(producerRecord);
        }

        kafkaProducer.close();
    }
}

九、总结

        Flink 窗口机制犹如精密仪器,从控制属性、分类设计到函数运用,各环节紧密相扣。深入理解其原理、熟练实操代码,能为实时流数据处理注入强大动力,解锁更多高效、智能数据聚合分析场景,助力开发者在大数据浪潮中稳立潮头、驾驭数据。后续可深入探索自定义窗口逻辑、优化性能调优等进阶方向,深挖 Flink 窗口潜力。


原文地址:https://blog.csdn.net/weixin_64726356/article/details/144094102

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!