自学内容网 自学内容网

Flink使用详解

本文档使用 flink-1.13.1 版本依赖

一、Flink 常用流处理 API

getExecutionEnvironment

创建执行环境

StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();

setParallelism

为执行环境设置并行度

environment.setParallelism(1);

addSource

给执行环境指定数据来源

// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
        .setHost("192.168.117.4")
        .setPort(5672)
        .setVirtualHost("/")
        .setUserName("mix")
        .setPassword("jovision")
        .build();
// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};

// 创建 RabbitMQ 数据源,获取名为demo.in队列中的消息
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
        connectionConfig,
        "demo.in",
        true,
        new SimpleStringSchema()
));

map

用于将接收到的流转化成目标数据类型

// 将所有接受的数据分出两个相同的流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
        TestDto testDto = JSON.parseObject(s, TestDto.class);
        return testDto;
    }
});

OutputTag

侧输出流,可拷贝来源流并进行其他逻辑操作

// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
// 将所有接受的数据分出两个流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = originStream.process(new ProcessFunction<TestDto, TestDto>() {
    @Override
    public void processElement(TestDto testDto, ProcessFunction<TestDto, TestDto>.Context context, Collector<TestDto> collector) throws Exception {
        // 通过age大小分流
        if(testDto.getAge() > 10){
            // 将流数据塞入上面创建的maxTestStream侧输出流
            context.output(maxTestStream, testDto);
        }else{
            // 将流数据依然放入当前输出流
            collector.collect(testDto);
        }
    }
});

print

打印流信息

originStream.print("sumStream");

filter

用于过滤接收的流

// 过滤接收的数据,只接收age=11的流数据放入sinkStream流对象
SingleOutputStreamOperator<String> sinkStream = originStream.filter(new FilterFunction<TestDto>() {
    @Override
    public boolean filter(TestDto testDto) throws Exception {
        return testDto.getAge() == 11;
    }
}).map(new MapFunction<TestDto, String>() {
    // 转成String
    @Override
    public String map(TestDto testDto) throws Exception {
        return JSON.toJSONString(testDto);
    }
});

getSideOutput

获取测输出流数据

// 上面是在originStream流对象将流数据分给maxTestStream侧输出流的,所以需要如下调用
DataStream<TestDto> sideOutput = originStream.getSideOutput(maxTestStream);

keyBy

将获取到的流对指定属性分组

// 按照对象的name进行分组
KeyedStream<TestDto, Tuple> keyedStream = sideOutput.keyBy("name");

max

分组后返回指定属性最大的值(返回的指定属性最大的值是正确的,但是其中的其他属性可能不准确)

DataStream<TestDto> maxStream = keyedStream.max("age");

maxBy

分组后返回指定属性最大的值(返回的最大值与相应的其他字段内容都是准确的)

DataStream<TestDto> maxByStream = keyedStream.maxBy("age");

sum

分组后返回指定属性累加值

DataStream<TestDto> sumStream = keyedStream.sum("age");

reduce

分组后可操作新数据及上一次数据,可进行累加、比较等逻辑操作

// 每次收到数据会按照分组信息分组,并累加上一次的age数据后存入reduceStream流对象,也可实现其他逻辑
SingleOutputStreamOperator<TestDto> reduceStream = keyedStream.reduce(new ReduceFunction<TestDto>() {
    @Override
    public TestDto reduce(TestDto beforeDto, TestDto nowDto) throws Exception {
        log.info("----------- reduce beforeDto:" + beforeDto);
        log.info("----------- reduce nowDto:" + nowDto);
        return beforeDto.setAge(beforeDto.getAge() + nowDto.getAge());
    }
});

connect

可合并两个不同数据类型的流

// 连接两个不同数据类型的流,类型为String的流对象sinkStream与类型为TestDto的流对象sideOutput合并
ConnectedStreams<String, TestDto> connectStream = sinkStream.connect(sideOutput);
DataStream<Object> connectObjectStream = connectStream.map(new CoMapFunction<String, TestDto, Object>() {
    @Override
    public Object map1(String s) throws Exception {
        // 对第一个sinkStream流对象中的数据进行操作并返回
        return new Tuple2<String,String>("stringStream",s);
    }

    @Override
    public Object map2(TestDto testDto) throws Exception {
        // 对第二个sideOutput流对象中的数据进行操作并返回
        return new Tuple2<String,TestDto>("objectStream",testDto);
    }
});

union

可合并两个相同数据类型的流

// union 的使用(同一种数据类型的流才能用这个方法)
DataStream<TestDto> unionStream = sideOutput.union(originStream);

addSink

为流对象指定数据发送目标

// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
        .setHost("192.168.117.4")
        .setPort(5672)
        .setVirtualHost("/")
        .setUserName("mix")
        .setPassword("jovision")
        .build();
// 将数据发送至名为demo.out的队列
sinkStream.addSink(new RMQSink<>(
        connectionConfig,
        "demo.out",
        new SimpleStringSchema()
));

execute

运行执行环境(在 addSink 之后执行)

// 执行
environment.execute();

二、Flink 连接器(数据源、数据写入)

在 Flink 官网中,当前自带的且常用的连接器如下:
在这里插入图片描述
RabbitMQ 已经有自带的【数据源连接器】以及【数据写入连接器】。
Redis、jdbc 只有自带的【数据写入连接器】,所以数据源连接器需要实现【SourceFunction】接口进行自定义。

RabbitMQ 连接器

数据源连接器(可实时消费消息)

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
        .setHost("192.168.117.4")
        .setPort(5672)
        .setVirtualHost("/")
        .setUserName("mix")
        .setPassword("jovision")
        .build();

// 创建 RabbitMQ 数据源
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
        connectionConfig,
        "demo.in",
        true,
        new SimpleStringSchema()
));

数据写入连接器

rabbitMQStream.addSink(new RMQSink<>(
        connectionConfig,
        "demo.out",
        new SimpleStringSchema()
));

Kafka 连接器

数据源连接器(可实时消费消息)

Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", "192.168.2.198:9092");
sourceProperties.setProperty("group.id", "test");

// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
        "vse.unicom.payload.channel", // 源 topic
        new SimpleStringSchema(),     // 数据序列化方式
        sourceProperties              // Kafka消费者配置
);

DataStream<String> sourceStream = environment.addSource(consumer);

数据写入连接器

Properties sinkProperties = new Properties();
sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.198:9092");
sinkProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
sinkProperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
sinkProperties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
sinkProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "4194304");
FlinkKafkaProducer<String> channelProducer = new FlinkKafkaProducer<String>(
        "vse.unicom.payload.tenant", // 目标 topic
        new SimpleStringSchema(),    // 序列化 schema
        sinkProperties);

sourceStream.addSink(channelProducer);

Redis 连接器

数据源连接器(需自定义,单次执行,不可监听 redis 数据变更)

public static class RedisSource implements SourceFunction<Tuple3<String,String,String>> {
    @Override
    public void run(SourceContext<Tuple3<String,String,String>> sourceContext) throws Exception {
        // 连接 Redis
        Jedis jedis = new Jedis("192.168.117.4", 6379);

        // 设置密码(如果需要的话)
        jedis.auth("redis@Abc-1234");
        // 选择数据库
        jedis.select(0);
        for (int i = 0; i < 10; i++){
            String key = "flink:test" + i;
            String key2 = "flink:map" + i;
            jedis.set( key, String.valueOf(new Random().nextInt()));
            sourceContext.collect(new Tuple3<>(key,key2,jedis.get(key)));
        }
    }

    @Override
    public void cancel() {

    }
}

数据写入连接器

public static class CoustomRedisSink implements RedisMapper<Tuple2<String, String>> {

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SET);
    }

    @Override
    public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
        return stringStringTuple2._1();
    }

    @Override
    public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
        return stringStringTuple2._2();
    }
}

jdbc 连接器

数据源连接器(需自定义,单次执行,不可监听 mysql 数据变更)

public static class MysqlSource implements SourceFunction<MeshVcDto> {
    @Override
    public void run(SourceContext<MeshVcDto> sourceContext) throws Exception {
        // 定义数据库连接信息
        String dbURL = "jdbc:mysql://192.168.117.4:3306/jvs_tdms";
        String username = "root";
        String password = "Jo123@My";

        // 连接数据库
        Connection conn = DriverManager.getConnection(dbURL, username, password);

        // 执行查询
        String query = "SELECT id, device_id, mesh_vc, add_time FROM udms_mesh_vc_log where verification_code is null";
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(query);

        // 处理查询结果
        while (rs.next()) {
            Long id = rs.getLong("id");
            String deviceId = rs.getString("device_id");
            String meshVc = rs.getString("mesh_vc");
            Date addTime = rs.getDate("add_time");
            sourceContext.collect(new MeshVcDto().setId(id).setMeshVc(meshVc).setDeviceId(deviceId).setAddTime(addTime));
        }
    }

    @Override
    public void cancel() {

    }
}

数据写入连接器

dataStream.addSink(JdbcSink.sink("update udms_mesh_vc_log set device_id = ?, mesh_vc = ? where id = ?", new JdbcStatementBuilder<MeshVcDto>() {
        @Override
        public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
            preparedStatement.setString(1, meshVcDto.getDeviceId());
            preparedStatement.setString(2, meshVcDto.getMeshVc());
            preparedStatement.setLong(3, meshVcDto.getId());
        }
    },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withDriverName("com.mysql.jdbc.Driver")
            .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
            .withUsername("root")
            .withPassword("Jo123@My")
            .build()));


dataStream.addSink(JdbcSink.sink("insert into udms_mesh_vc_log (device_id, mesh_vc) values (?,?)", new JdbcStatementBuilder<MeshVcDto>() {
        @Override
        public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
            preparedStatement.setString(1, meshVcDto.getDeviceId());
            preparedStatement.setString(2, meshVcDto.getMeshVc());
        }
    },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("com.mysql.jdbc.Driver")
                    .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
                    .withUsername("root")
                    .withPassword("Jo123@My")
                    .build()));

三、Flink 常用 Window

窗口计算,需要先分组,然后指定窗口类型,然后编写计算逻辑

  • Window 可以分为两大类:
    • CountWindow:按照指定的数据条数生产一个 Window,只有数据数量有关,分为如下两类
      • 滚动计数窗口 ( Tumbling Count Window),窗口没有重叠
      • 滑动计数窗口 ( Sliding Count Window),窗口有重叠
    • TimeWindow:按照时间生成 Window,根据窗口实现原理的不同分为三类:
      • 滚动时间窗口 ( Tumbling Time Window),窗口没有重叠
      • 滑动时间窗口 ( Sliding Time Window),窗口有重叠
      • 会话窗口 ( Session Window),窗口开始结束时间不固定,在一个固定的时间周期没有新的元素接收,会自动关闭窗口

CountWindow

滚动计数窗口

// 只有当前这个name的消息收到第3次,才会计算前三次的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map1 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
        return JSON.parseObject(s, TestDto.class);
    }
}).keyBy("name")
.countWindow(3)
.sum("age").map(new MapFunction<TestDto, String>() {
    @Override
    public String map(TestDto testDto) throws Exception {
        return JSON.toJSONString(testDto);
    }
});

滑动计数窗口

// 只有当前这个name的消息收到第2次,才会计算前5次的和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map2 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
        return JSON.parseObject(s, TestDto.class);
    }
}).keyBy("name")
.countWindow(5, 2)
.sum("age").map(new MapFunction<TestDto, String>() {
    @Override
    public String map(TestDto testDto) throws Exception {
        return JSON.toJSONString(testDto);
    }
});

TimeWindow

滚动时间窗口

注意事项:
触发计算的动作与并行度大小有关系,一个窗口只有接收消息的数量达到并行度之后,才能触发上个窗口的计算并输出
例如:设置为 1 并行度时,[0,15)窗口已经过去,但是不会马上输出计算结果,来到[15,30)窗口,只有接收了 1 条消息,才会触发[0,15)窗口的计算并输出

// 迟到较晚的流数据存储至本侧输出流
OutputTag<TestDto> testDtoOutputTag = new OutputTag<TestDto>("迟到了") {};
// 只有当前这个name的消息一直收到15s,才会计算这15s时间段的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<TestDto> name = rabbitMQStream
        // window截止后继续等2秒,将window范围内的时间加入到计算
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TestDto>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(TestDto testDto) {
                return testDto.getTimeStamp() * 1000;
            }
        })
        .keyBy("name")
        .window(TumblingEventTimeWindows.of(Time.seconds(15)))
        // 迟到数据,在事件时间窗口内的消息,窗口结束后的5秒内收到的消息都被允许
        .allowedLateness(Time.seconds(5))
        // 迟到太多的数据会被放到侧输出流中进行补偿处理
        .sideOutputLateData(testDtoOutputTag)
        .aggregate(new AggregateFunction<TestDto, TestDto, TestDto>() {
            // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
            @Override
            public TestDto createAccumulator() {
                return new TestDto().setAge(0);
            }
            // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。
            // 方法传入两个参数:当前新到的数据value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
            @Override
            public TestDto add(TestDto input, TestDto init) {
                return input.setAge(init.getAge() + input.getAge());
            }
            // 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
            // 比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
            @Override
            public TestDto getResult(TestDto testDto) {
                return testDto.setTimeStamp(new Date().getTime() / 1000).setDate(LocalDateTime.now().toString());
            }

            // 合并两个累加器,并将合并后的状态作为一个累加器返回。
            // 这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
            @Override
            public TestDto merge(TestDto testDto, TestDto acc1) {
                return null;
            }
        }, new ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>() {
            // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
            @Override
            public void process(Tuple tuple, ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>.Context context, Iterable<TestDto> iterable, Collector<TestDto> collector) throws Exception {
                iterable.forEach(item -> {
                    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    item.setWindowStartTime(context.window().getStart());
                    item.setWindowEndTime(context.window().getEnd());
                    log.info("窗口:[{}, {}) 当前时间:{}, item: {}", format.format(new Date(context.window().getStart())),
                            format.format(new Date(context.window().getEnd())), format.format(new Date()), item);
                    collector.collect(item);
                });
            }
        });

四、Flink 部署

服务启动

● 进入 flink 的 bin 目录,启动 start-cluster.sh

./start-cluster.sh

● 进入 flink 的 conf 目录,在 flink-conf.yaml 文件中查看启动端口(rest.port 参数值)

vi flink-conf.yaml

● 输入服务器 ip 及端口
在这里插入图片描述
● 启动完成

页面操作任务

上传 jar 文件

将使用流处理 API 实现的程序打包为 jar 文件,并上传
在这里插入图片描述
上传成功
在这里插入图片描述

配置启动类并启动任务

在这里插入图片描述
配置完之后点击【Submit】即可执行任务
在这里插入图片描述

停止任务

在这里插入图片描述
在这里插入图片描述

命令操作任务

进入 flink 的 bin 目录下

查看正在执行的任务列表

./flink list

结果展示
在这里插入图片描述

取消任务

● 取消正在执行的指定任务

./flink cancel 【正在执行的任务的id值】-s 【保存点文件夹名】
或
./flink cancel 【正在执行的任务的id值】 (不会有保存点文件)
或
./flink cancel  -s 【正在执行的任务的id值】

例:
./flink cancel 0fa34b7daa017c510adb3692e55d4c96 -s 1234
或
./flink cancel 0fa34b7daa017c510adb3692e55d4c96 (不会有保存点文件)
或
./flink cancel -s 0fa34b7daa017c510adb3692e55d4c96

结果展示
在这里插入图片描述

启动任务

● 无安全点文件启动任务

./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】

例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties

● 有安全点文件启动任务(前提是取消任务时有安全点文件保存)

./flink run -p 【Parallelism】 -s 【执行取消任务的安全点的目录】-c 【EntryClass】[--allowNonRestoredState] 【已上传jar包的路径】 【Program Arguments】

注: --allowNonRestoredState 非必填,可绕过保存点恢复的错误继续启动任务,绕过错误可能会丢失数据,可先不带此配置启动,报错后再加上执行启动命令也可

例:./flink run -p 1 -s file:/tmp/flink/flinkpoint/savepoints/savepoint-029d78-73d8367621d7 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties

结果展示
在这里插入图片描述

启动任务时 jar 包路径可通过以下两种方式获取:
● 上传 jar 包至 linux 系统后直接使用
● 在页面上传文件,并通过以下步骤获取:
○ 在 flink 页面上传文件完成,查看如下内容
在这里插入图片描述
○ 在 linux 中查看上一步拿到的路径,该路径就是已上传的 jar 包的缓存文件
在这里插入图片描述

五、Flink 扩容

flink 扩容是对 TaskManager 数量的扩大,相对应将执行任务的并行度随之扩大。
在这里插入图片描述

flink 任务卡槽设置

  1. 需要在服务启动前配置
  2. flink-conf.yaml 中配置 key/value 的时候在“:”后面需要有一个空格,否则配置不会生效。
  3. 单个 TaskManager 的任务卡槽数量需根据服务器资源配置

进入 flink 的 conf 目录,在 flink-conf.yaml 文件中查看单个 TaskManager 的任务卡槽数量(taskmanager.numberOfTaskSlots 参数值):

vi flink-conf.yaml

flink 任务并行度设置

  1. 需要在任务启动时配置
  2. 修改并行度时不能实时生效,需要重启任务

页面设置

在这里插入图片描述

命令设置

./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】

例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties

原文地址:https://blog.csdn.net/m0_63164811/article/details/143849610

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!