Flink普通API之Source使用全解析
Flink普通API之Source使用全解析
一、引言
在Flink的流式计算世界里,Source作为数据的源头起着至关重要的作用。它能够为Flink任务提供数据输入,无论是批处理还是流处理场景,合适的Source选择与使用都能让数据处理流程顺利开启。本文将深入探讨Flink中Source的相关知识,包括预定义Source、自定义Source以及重要的Kafka Source,并结合实际案例与操作进行详细讲解。
二、预定义Source演示
(一)基于本地集合的Source
在Flink中,基于本地集合创建DataStream有多种方式:
- env.fromElements():
- 这种方式支持多种数据类型,包括Tuple、自定义对象等复合形式。但需要注意的是,传入的参数类型要一致,若不一致可以用Object接收,但使用过程中可能会报错。例如
env.fromElements("haha", 1);
是不推荐的用法。从源码注释可知其对数据类型有一定要求与限制。
- 这种方式支持多种数据类型,包括Tuple、自定义对象等复合形式。但需要注意的是,传入的参数类型要一致,若不一致可以用Object接收,但使用过程中可能会报错。例如
- env.fromCollection():
- 它支持多种Collection的具体类型,如List,Set,Queue等。可以方便地将本地集合数据转换为DataStream。例如:
List<String> list = new ArrayList<>();
list.add("flink");
list.add("hadoop");
DataStream<String> dataStream = env.fromCollection(list);
- env.fromSequence():
- 该方法用于创建基于开始和结束的DataStream,一般用于学习测试时编造数据。例如
env.fromSequence(1, 10);
会生成从1到10的整数序列的DataStream。
- 该方法用于创建基于开始和结束的DataStream,一般用于学习测试时编造数据。例如
在代码中还可以指定并行度:
- 指定全局并行度:
env.setParallelism(12);
- 获得全局并行度:
int parallelism = env.getParallelism();
- 指定算子设置并行度与获取指定算子并行度:
DataStreamSource<String> eventSource = env.fromElements("a", "b", "c");
// 指定算子并行度
eventSource.setParallelism(5);
// 获取指定算子并行度
int operatorParallelism = eventSource.getParallelism();
(二)基于文件的Source
Flink可以从本地文件读取数据创建DataStream。例如:
DataStream<String> fileStream = env.readTextFile("path/to/file.txt");
(三)基于网络套接字(socketTextStream)
- socketTextStream方法:
socketTextStream(String hostname, int port)
是一个非并行的Source。它需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,如socketTextStream(String hostname, int port, String delimiter, long maxRetry)
,这个重载的方法可以指定行分隔符(默认行分隔符是”\n”)和最大重新连接次数(默认最大重新连接次数为0)。 - 使用提示:如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务。Mac或Linux用户可以在命令行终端输入
nc -lk 8888
启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令,如nc -lp 8888
。 - 代码演示:以下代码用于演示统计socket中的单词数量,体会流式计算的魅力!并且通过在代码中打印并行度,可以发现
socketTextStream
获取到的dataStream
,并行度为1。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> socketStream = env.socketTextStream("localhost", 8888);
// 后续进行单词计数等处理操作
env.execute();
三、自定义Source
Flink提供了多种自定义Source的接口和类:
- SourceFunction:非并行数据源(并行度只能 = 1)接口。
- RichSourceFunction:多功能非并行数据源(并行度只能 = 1)类。
- ParallelSourceFunction:并行数据源(并行度能够 >= 1)接口。
- RichParallelSourceFunction:多功能并行数据源(并行度能够 >= 1)类,建议使用。这里的“Rich”字样代表富有,意味着在编程中可以调用更多的方法,功能更加全面。例如通过
ParallelSourceFunction
创建可并行Source,如果代码换成ParallelSourceFunction
,每次生成12个数据(假设是12核数的环境),可以充分利用多核并行处理能力。
四、Kafka Source
Kafka作为常用的消息队列系统,在Flink中可以作为重要的数据来源。
- Kafka相关操作复习:
- Zookeeper启动:
[root@bigdata01 app]# zk.sh start
---------- bigdata01 ----------
Starting zookeeper... STARTED
---------- bigdata02 ----------
Starting zookeeper... STARTED
---------- bigdata03 ----------
Starting zookeeper... STARTED
[root@bigdata01 app]# zk.sh status
---------- bigdata01 ----------
Mode: follower
---------- bigdata02 ----------
Mode: leader
---------- bigdata03 ----------
Mode: follower
- Kafka启动:
kf.sh start
- Kafka可视化界面(选做):
./kafkaUI.sh start
,可通过http://bigdata01:8889
访问。 - 创建Topic:可以通过界面创建,也可以通过命令创建,如
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic first
。 - 控制台生产者与消费者:
- 控制台生产者:
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first
- 控制台消费者:
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first
- 控制台生产者:
- Flink使用Kafka Source代码示例:
package com.bigdata.day02;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
// 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
// 返回true的数据就保留下来,返回false直接丢弃
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String word) throws Exception {
// 查看单词中是否包含success字样
return word.contains("success");
}
}).print();
env.execute();
}
}
五、总结
在Flink任务开发中,Source的选择与正确使用是构建数据处理管道的第一步。无论是本地集合、文件、网络套接字还是强大的Kafka作为数据源,都有其各自的特点与适用场景。开发者需要根据实际业务需求、数据来源特点以及系统架构等因素综合考虑,选择最合适的Source,并合理设置相关参数如并行度等,以确保数据能够高效、准确地流入Flink任务进行后续处理。同时,对于自定义Source的了解也为处理特殊数据源或定制化数据输入需求提供了可能,进一步拓展了Flink在大数据处理领域的应用灵活性。
原文地址:https://blog.csdn.net/qq_68076599/article/details/143991446
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!