Flink使用详解
本文档使用 flink-1.13.1 版本依赖
一、Flink 常用流处理 API
getExecutionEnvironment
创建执行环境
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
setParallelism
为执行环境设置并行度
environment.setParallelism(1);
addSource
给执行环境指定数据来源
// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("192.168.117.4")
.setPort(5672)
.setVirtualHost("/")
.setUserName("mix")
.setPassword("jovision")
.build();
// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
// 创建 RabbitMQ 数据源,获取名为demo.in队列中的消息
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
connectionConfig,
"demo.in",
true,
new SimpleStringSchema()
));
map
用于将接收到的流转化成目标数据类型
// 将所有接受的数据分出两个相同的流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = rabbitMQStream.map(new MapFunction<String, TestDto>() {
@Override
public TestDto map(String s) throws Exception {
TestDto testDto = JSON.parseObject(s, TestDto.class);
return testDto;
}
});
OutputTag
侧输出流,可拷贝来源流并进行其他逻辑操作
// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
// 将所有接受的数据分出两个流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = originStream.process(new ProcessFunction<TestDto, TestDto>() {
@Override
public void processElement(TestDto testDto, ProcessFunction<TestDto, TestDto>.Context context, Collector<TestDto> collector) throws Exception {
// 通过age大小分流
if(testDto.getAge() > 10){
// 将流数据塞入上面创建的maxTestStream侧输出流
context.output(maxTestStream, testDto);
}else{
// 将流数据依然放入当前输出流
collector.collect(testDto);
}
}
});
打印流信息
originStream.print("sumStream");
filter
用于过滤接收的流
// 过滤接收的数据,只接收age=11的流数据放入sinkStream流对象
SingleOutputStreamOperator<String> sinkStream = originStream.filter(new FilterFunction<TestDto>() {
@Override
public boolean filter(TestDto testDto) throws Exception {
return testDto.getAge() == 11;
}
}).map(new MapFunction<TestDto, String>() {
// 转成String
@Override
public String map(TestDto testDto) throws Exception {
return JSON.toJSONString(testDto);
}
});
getSideOutput
获取测输出流数据
// 上面是在originStream流对象将流数据分给maxTestStream侧输出流的,所以需要如下调用
DataStream<TestDto> sideOutput = originStream.getSideOutput(maxTestStream);
keyBy
将获取到的流对指定属性分组
// 按照对象的name进行分组
KeyedStream<TestDto, Tuple> keyedStream = sideOutput.keyBy("name");
max
分组后返回指定属性最大的值(返回的指定属性最大的值是正确的,但是其中的其他属性可能不准确)
DataStream<TestDto> maxStream = keyedStream.max("age");
maxBy
分组后返回指定属性最大的值(返回的最大值与相应的其他字段内容都是准确的)
DataStream<TestDto> maxByStream = keyedStream.maxBy("age");
sum
分组后返回指定属性累加值
DataStream<TestDto> sumStream = keyedStream.sum("age");
reduce
分组后可操作新数据及上一次数据,可进行累加、比较等逻辑操作
// 每次收到数据会按照分组信息分组,并累加上一次的age数据后存入reduceStream流对象,也可实现其他逻辑
SingleOutputStreamOperator<TestDto> reduceStream = keyedStream.reduce(new ReduceFunction<TestDto>() {
@Override
public TestDto reduce(TestDto beforeDto, TestDto nowDto) throws Exception {
log.info("----------- reduce beforeDto:" + beforeDto);
log.info("----------- reduce nowDto:" + nowDto);
return beforeDto.setAge(beforeDto.getAge() + nowDto.getAge());
}
});
connect
可合并两个不同数据类型的流
// 连接两个不同数据类型的流,类型为String的流对象sinkStream与类型为TestDto的流对象sideOutput合并
ConnectedStreams<String, TestDto> connectStream = sinkStream.connect(sideOutput);
DataStream<Object> connectObjectStream = connectStream.map(new CoMapFunction<String, TestDto, Object>() {
@Override
public Object map1(String s) throws Exception {
// 对第一个sinkStream流对象中的数据进行操作并返回
return new Tuple2<String,String>("stringStream",s);
}
@Override
public Object map2(TestDto testDto) throws Exception {
// 对第二个sideOutput流对象中的数据进行操作并返回
return new Tuple2<String,TestDto>("objectStream",testDto);
}
});
union
可合并两个相同数据类型的流
// union 的使用(同一种数据类型的流才能用这个方法)
DataStream<TestDto> unionStream = sideOutput.union(originStream);
addSink
为流对象指定数据发送目标
// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("192.168.117.4")
.setPort(5672)
.setVirtualHost("/")
.setUserName("mix")
.setPassword("jovision")
.build();
// 将数据发送至名为demo.out的队列
sinkStream.addSink(new RMQSink<>(
connectionConfig,
"demo.out",
new SimpleStringSchema()
));
execute
运行执行环境(在 addSink 之后执行)
// 执行
environment.execute();
二、Flink 连接器(数据源、数据写入)
在 Flink 官网中,当前自带的且常用的连接器如下:
RabbitMQ 已经有自带的【数据源连接器】以及【数据写入连接器】。
Redis、jdbc 只有自带的【数据写入连接器】,所以数据源连接器需要实现【SourceFunction】接口进行自定义。
RabbitMQ 连接器
数据源连接器(可实时消费消息)
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("192.168.117.4")
.setPort(5672)
.setVirtualHost("/")
.setUserName("mix")
.setPassword("jovision")
.build();
// 创建 RabbitMQ 数据源
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
connectionConfig,
"demo.in",
true,
new SimpleStringSchema()
));
数据写入连接器
rabbitMQStream.addSink(new RMQSink<>(
connectionConfig,
"demo.out",
new SimpleStringSchema()
));
Kafka 连接器
数据源连接器(可实时消费消息)
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", "192.168.2.198:9092");
sourceProperties.setProperty("group.id", "test");
// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"vse.unicom.payload.channel", // 源 topic
new SimpleStringSchema(), // 数据序列化方式
sourceProperties // Kafka消费者配置
);
DataStream<String> sourceStream = environment.addSource(consumer);
数据写入连接器
Properties sinkProperties = new Properties();
sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.198:9092");
sinkProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
sinkProperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
sinkProperties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
sinkProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "4194304");
FlinkKafkaProducer<String> channelProducer = new FlinkKafkaProducer<String>(
"vse.unicom.payload.tenant", // 目标 topic
new SimpleStringSchema(), // 序列化 schema
sinkProperties);
sourceStream.addSink(channelProducer);
Redis 连接器
数据源连接器(需自定义,单次执行,不可监听 redis 数据变更)
public static class RedisSource implements SourceFunction<Tuple3<String,String,String>> {
@Override
public void run(SourceContext<Tuple3<String,String,String>> sourceContext) throws Exception {
// 连接 Redis
Jedis jedis = new Jedis("192.168.117.4", 6379);
// 设置密码(如果需要的话)
jedis.auth("redis@Abc-1234");
// 选择数据库
jedis.select(0);
for (int i = 0; i < 10; i++){
String key = "flink:test" + i;
String key2 = "flink:map" + i;
jedis.set( key, String.valueOf(new Random().nextInt()));
sourceContext.collect(new Tuple3<>(key,key2,jedis.get(key)));
}
}
@Override
public void cancel() {
}
}
数据写入连接器
public static class CoustomRedisSink implements RedisMapper<Tuple2<String, String>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
return stringStringTuple2._1();
}
@Override
public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
return stringStringTuple2._2();
}
}
jdbc 连接器
数据源连接器(需自定义,单次执行,不可监听 mysql 数据变更)
public static class MysqlSource implements SourceFunction<MeshVcDto> {
@Override
public void run(SourceContext<MeshVcDto> sourceContext) throws Exception {
// 定义数据库连接信息
String dbURL = "jdbc:mysql://192.168.117.4:3306/jvs_tdms";
String username = "root";
String password = "Jo123@My";
// 连接数据库
Connection conn = DriverManager.getConnection(dbURL, username, password);
// 执行查询
String query = "SELECT id, device_id, mesh_vc, add_time FROM udms_mesh_vc_log where verification_code is null";
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
// 处理查询结果
while (rs.next()) {
Long id = rs.getLong("id");
String deviceId = rs.getString("device_id");
String meshVc = rs.getString("mesh_vc");
Date addTime = rs.getDate("add_time");
sourceContext.collect(new MeshVcDto().setId(id).setMeshVc(meshVc).setDeviceId(deviceId).setAddTime(addTime));
}
}
@Override
public void cancel() {
}
}
数据写入连接器
dataStream.addSink(JdbcSink.sink("update udms_mesh_vc_log set device_id = ?, mesh_vc = ? where id = ?", new JdbcStatementBuilder<MeshVcDto>() {
@Override
public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
preparedStatement.setString(1, meshVcDto.getDeviceId());
preparedStatement.setString(2, meshVcDto.getMeshVc());
preparedStatement.setLong(3, meshVcDto.getId());
}
},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
.withUsername("root")
.withPassword("Jo123@My")
.build()));
dataStream.addSink(JdbcSink.sink("insert into udms_mesh_vc_log (device_id, mesh_vc) values (?,?)", new JdbcStatementBuilder<MeshVcDto>() {
@Override
public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
preparedStatement.setString(1, meshVcDto.getDeviceId());
preparedStatement.setString(2, meshVcDto.getMeshVc());
}
},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
.withUsername("root")
.withPassword("Jo123@My")
.build()));
三、Flink 常用 Window
窗口计算,需要先分组,然后指定窗口类型,然后编写计算逻辑
- Window 可以分为两大类:
- CountWindow:按照指定的数据条数生产一个 Window,只有数据数量有关,分为如下两类
- 滚动计数窗口 ( Tumbling Count Window),窗口没有重叠
- 滑动计数窗口 ( Sliding Count Window),窗口有重叠
- TimeWindow:按照时间生成 Window,根据窗口实现原理的不同分为三类:
- 滚动时间窗口 ( Tumbling Time Window),窗口没有重叠
- 滑动时间窗口 ( Sliding Time Window),窗口有重叠
- 会话窗口 ( Session Window),窗口开始结束时间不固定,在一个固定的时间周期没有新的元素接收,会自动关闭窗口
- CountWindow:按照指定的数据条数生产一个 Window,只有数据数量有关,分为如下两类
CountWindow
滚动计数窗口
// 只有当前这个name的消息收到第3次,才会计算前三次的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map1 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
@Override
public TestDto map(String s) throws Exception {
return JSON.parseObject(s, TestDto.class);
}
}).keyBy("name")
.countWindow(3)
.sum("age").map(new MapFunction<TestDto, String>() {
@Override
public String map(TestDto testDto) throws Exception {
return JSON.toJSONString(testDto);
}
});
滑动计数窗口
// 只有当前这个name的消息收到第2次,才会计算前5次的和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map2 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
@Override
public TestDto map(String s) throws Exception {
return JSON.parseObject(s, TestDto.class);
}
}).keyBy("name")
.countWindow(5, 2)
.sum("age").map(new MapFunction<TestDto, String>() {
@Override
public String map(TestDto testDto) throws Exception {
return JSON.toJSONString(testDto);
}
});
TimeWindow
滚动时间窗口
注意事项:
触发计算的动作与并行度大小有关系,一个窗口只有接收消息的数量达到并行度之后,才能触发上个窗口的计算并输出
例如:设置为 1 并行度时,[0,15)窗口已经过去,但是不会马上输出计算结果,来到[15,30)窗口,只有接收了 1 条消息,才会触发[0,15)窗口的计算并输出
// 迟到较晚的流数据存储至本侧输出流
OutputTag<TestDto> testDtoOutputTag = new OutputTag<TestDto>("迟到了") {};
// 只有当前这个name的消息一直收到15s,才会计算这15s时间段的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<TestDto> name = rabbitMQStream
// window截止后继续等2秒,将window范围内的时间加入到计算
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TestDto>(Time.seconds(2)) {
@Override
public long extractTimestamp(TestDto testDto) {
return testDto.getTimeStamp() * 1000;
}
})
.keyBy("name")
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
// 迟到数据,在事件时间窗口内的消息,窗口结束后的5秒内收到的消息都被允许
.allowedLateness(Time.seconds(5))
// 迟到太多的数据会被放到侧输出流中进行补偿处理
.sideOutputLateData(testDtoOutputTag)
.aggregate(new AggregateFunction<TestDto, TestDto, TestDto>() {
// 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
@Override
public TestDto createAccumulator() {
return new TestDto().setAge(0);
}
// 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。
// 方法传入两个参数:当前新到的数据value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
@Override
public TestDto add(TestDto input, TestDto init) {
return input.setAge(init.getAge() + input.getAge());
}
// 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
// 比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
@Override
public TestDto getResult(TestDto testDto) {
return testDto.setTimeStamp(new Date().getTime() / 1000).setDate(LocalDateTime.now().toString());
}
// 合并两个累加器,并将合并后的状态作为一个累加器返回。
// 这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
@Override
public TestDto merge(TestDto testDto, TestDto acc1) {
return null;
}
}, new ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>() {
// 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
@Override
public void process(Tuple tuple, ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>.Context context, Iterable<TestDto> iterable, Collector<TestDto> collector) throws Exception {
iterable.forEach(item -> {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
item.setWindowStartTime(context.window().getStart());
item.setWindowEndTime(context.window().getEnd());
log.info("窗口:[{}, {}) 当前时间:{}, item: {}", format.format(new Date(context.window().getStart())),
format.format(new Date(context.window().getEnd())), format.format(new Date()), item);
collector.collect(item);
});
}
});
四、Flink 部署
服务启动
● 进入 flink 的 bin 目录,启动 start-cluster.sh
./start-cluster.sh
● 进入 flink 的 conf 目录,在 flink-conf.yaml 文件中查看启动端口(rest.port 参数值)
vi flink-conf.yaml
● 输入服务器 ip 及端口
● 启动完成
页面操作任务
上传 jar 文件
将使用流处理 API 实现的程序打包为 jar 文件,并上传
上传成功
配置启动类并启动任务
配置完之后点击【Submit】即可执行任务
停止任务
命令操作任务
进入 flink 的 bin 目录下
查看正在执行的任务列表
./flink list
结果展示
取消任务
● 取消正在执行的指定任务
./flink cancel 【正在执行的任务的id值】-s 【保存点文件夹名】
或
./flink cancel 【正在执行的任务的id值】 (不会有保存点文件)
或
./flink cancel -s 【正在执行的任务的id值】
例:
./flink cancel 0fa34b7daa017c510adb3692e55d4c96 -s 1234
或
./flink cancel 0fa34b7daa017c510adb3692e55d4c96 (不会有保存点文件)
或
./flink cancel -s 0fa34b7daa017c510adb3692e55d4c96
结果展示
启动任务
● 无安全点文件启动任务
./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
● 有安全点文件启动任务(前提是取消任务时有安全点文件保存)
./flink run -p 【Parallelism】 -s 【执行取消任务的安全点的目录】-c 【EntryClass】[--allowNonRestoredState] 【已上传jar包的路径】 【Program Arguments】
注: --allowNonRestoredState 非必填,可绕过保存点恢复的错误继续启动任务,绕过错误可能会丢失数据,可先不带此配置启动,报错后再加上执行启动命令也可
例:./flink run -p 1 -s file:/tmp/flink/flinkpoint/savepoints/savepoint-029d78-73d8367621d7 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
结果展示
启动任务时 jar 包路径可通过以下两种方式获取:
● 上传 jar 包至 linux 系统后直接使用
● 在页面上传文件,并通过以下步骤获取:
○ 在 flink 页面上传文件完成,查看如下内容
○ 在 linux 中查看上一步拿到的路径,该路径就是已上传的 jar 包的缓存文件
五、Flink 扩容
flink 扩容是对 TaskManager 数量的扩大,相对应将执行任务的并行度随之扩大。
flink 任务卡槽设置
- 需要在服务启动前配置
- flink-conf.yaml 中配置 key/value 的时候在“:”后面需要有一个空格,否则配置不会生效。
- 单个 TaskManager 的任务卡槽数量需根据服务器资源配置
进入 flink 的 conf 目录,在 flink-conf.yaml 文件中查看单个 TaskManager 的任务卡槽数量(taskmanager.numberOfTaskSlots 参数值):
vi flink-conf.yaml
flink 任务并行度设置
- 需要在任务启动时配置
- 修改并行度时不能实时生效,需要重启任务
页面设置
命令设置
./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
原文地址:https://blog.csdn.net/m0_63164811/article/details/143849610
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!