自学内容网 自学内容网

flink学习(14)—— 双流join

概述

Join:内连接

CoGroup:内连接,左连接,右连接

Interval Join:点对面

Join

1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
3、Join 通用用法如下:
    stream.join(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)

滚动窗口

package com.bigdata.day07;


import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


import java.text.ParseException;
import java.time.Duration;
import java.util.Date;

/**
 * 内连接
 * 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2
 */
public class _01_双流join_join_内连接 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 绿色的流
        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
        
        // 红色的流
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                               
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
                        
        // 双流join
        DataStream<Tuple3<String, Integer, Integer>> rsSource = greenSource.join(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
            // 滚动窗口
        }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {
                return Tuple3.of(first.f0, first.f1, second.f1);
            }
        });

        redSource.print("红色的流:");
        greenSource.print("绿色的流:");
        rsSource.print("合并后的流:");

        env.execute();


    }
}

滑动窗口

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;


/**
 * @基本功能: 演示join的滑动窗口
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-20 09:11:13
 **/
public class Demo02Join {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 将并行度设置为1,否则很难看到现象
        env.setParallelism(1);
        // 创建一个绿色的流
        DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);
        // key,0,2021-03-26 12:09:00 将它变为三元组
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
                                String time = element.f2; //2021-03-26 12:09:00
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    Date date = sdf.parse(time);
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        })
        );
        // 创建一个橘色的流
        DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);
        // key,0,2021-03-26 12:09:00 将它变为三元组
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
                                String time = element.f2; //2021-03-26 12:09:00
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    Date date = sdf.parse(time);
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        })
        );

        //2. source-加载数据
        //3. transformation-数据处理转换
        DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream)
                .where(tuple3 -> tuple3.f0)
                .equalTo(tuple3 -> tuple3.f0)
                // 滑动窗口
                .window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {
                        return Tuple3.of(first.f0, first.f1, second.f1);
                    }
                });
        //4. sink-数据输出
        greenDataStream.print("绿色的流:");
        orangeDataStream.print("橘色的流:");

        resultStream.print("最终的结果:");


        //5. execute-执行
        env.execute();
    }
}

CoGroup

1、优势:可以实现内连接,左连接,右连接
2、劣势:内存压力大
3、和上面的写法区别:将join换成coGroup,apply中实现的具体方法有区别 
4、流程
stream.coGroup(otherStream)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<CoGroupFunction>);

内连接

package com.bigdata.day07;


import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;

/**
 * 内连接
 */
public class _02_双流join_CoGroup_内连接 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 绿色的流
        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                               
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
                        
        // 红色的流
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
        
        
        // 连接
        DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {
            @Override
            public void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {
                for (Tuple3<String, Integer, String> firesTuple3 : first) {
                    for (Tuple3<String, Integer, String> secondTuple3 : second) {
                        out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));
                    }
                }
            }
        });

        redSource.print("红色的流:");
        greenSource.print("绿色的流:");
        rsSource.print("合并后的流:");

        env.execute();


    }
}

外连接

package com.bigdata.day07;


import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;

/**
 * 外连接
 */
public class _03_双流join_CoGroup_外连接 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 绿色的流
        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
        // 红色的流
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                        }));
        DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {
            @Override
            public void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {
            // 内连接,左连接,右连接的区别只在这里面存在,两层循环
                for (Tuple3<String, Integer, String> firesTuple3 : first) {
                    boolean isExist = false;
                    for (Tuple3<String, Integer, String> secondTuple3 : second) {
                        isExist = true;
                        out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));
                    }
                    if (!isExist){
                        out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red null"));
                    }
                }
            }
        });

        redSource.print("红色的流:");
        greenSource.print("绿色的流:");
        rsSource.print("合并后的流:");

        env.execute();


    }
}

Interval Join

1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join,必须给定窗口
2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。 
3、先对数据源进行keyBy
4、 外流.intervalJoin(内流)
        .between(-2,2)
        .process

between 左不包,右包
内部的流为下面的流(取单个值)

 代码实现

package com.bigdata.day07;


import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;


public class _04_双流join_Interval_Join {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //绿色的流
        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        
        KeyedStream<Tuple3<String, Integer, String>, String> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
            // 水印
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                            // keyBy
                        })).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        });


        // 红色的流
        DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
        KeyedStream<Tuple3<String, Integer, String>, String> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String line) throws Exception {
                String[] split = line.split(",");
                return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
            }
            // 水印
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
                                String timeStr = tuple3.f2;

                                try {
                                    Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }

                            }
                            // 分组
                        })).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {
            @Override
            public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
                return tuple3.f0;
            }
        });
        
        // 实现
        SingleOutputStreamOperator<String> rsSource = greenSource
                .intervalJoin(redSource)
                .between(Time.seconds(-2), Time.seconds(2))
                .process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {
            @Override
            public void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);
            }
        });



        redSource.print("红色的流:");
        greenSource.print("绿色的流:");
        rsSource.print("合并后的流:");

        env.execute();
/**
 * 红色的为下面的流
 * 范围:
 * 假如现在是10
 * 9 10 11 12
 */

    }
}


原文地址:https://blog.csdn.net/weixin_52642840/article/details/144183923

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!