自学内容网 自学内容网

Flink普通API之Source使用全解析

Flink普通API之Source使用全解析

一、引言

在Flink的流式计算世界里,Source作为数据的源头起着至关重要的作用。它能够为Flink任务提供数据输入,无论是批处理还是流处理场景,合适的Source选择与使用都能让数据处理流程顺利开启。本文将深入探讨Flink中Source的相关知识,包括预定义Source、自定义Source以及重要的Kafka Source,并结合实际案例与操作进行详细讲解。

二、预定义Source演示

(一)基于本地集合的Source

在Flink中,基于本地集合创建DataStream有多种方式:

  1. env.fromElements()
    • 这种方式支持多种数据类型,包括Tuple、自定义对象等复合形式。但需要注意的是,传入的参数类型要一致,若不一致可以用Object接收,但使用过程中可能会报错。例如env.fromElements("haha", 1);是不推荐的用法。从源码注释可知其对数据类型有一定要求与限制。
  2. env.fromCollection()
    • 它支持多种Collection的具体类型,如List,Set,Queue等。可以方便地将本地集合数据转换为DataStream。例如:
List<String> list = new ArrayList<>();
list.add("flink");
list.add("hadoop");
DataStream<String> dataStream = env.fromCollection(list);
  1. env.fromSequence()
    • 该方法用于创建基于开始和结束的DataStream,一般用于学习测试时编造数据。例如env.fromSequence(1, 10);会生成从1到10的整数序列的DataStream。

在代码中还可以指定并行度:

  • 指定全局并行度
env.setParallelism(12);
  • 获得全局并行度
int parallelism = env.getParallelism();
  • 指定算子设置并行度与获取指定算子并行度
DataStreamSource<String> eventSource = env.fromElements("a", "b", "c");
// 指定算子并行度
eventSource.setParallelism(5);
// 获取指定算子并行度
int operatorParallelism = eventSource.getParallelism();

(二)基于文件的Source

Flink可以从本地文件读取数据创建DataStream。例如:

DataStream<String> fileStream = env.readTextFile("path/to/file.txt");

(三)基于网络套接字(socketTextStream)

  • socketTextStream方法socketTextStream(String hostname, int port)是一个非并行的Source。它需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,如socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符(默认行分隔符是”\n”)和最大重新连接次数(默认最大重新连接次数为0)。
  • 使用提示:如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务。Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令,如nc -lp 8888
  • 代码演示:以下代码用于演示统计socket中的单词数量,体会流式计算的魅力!并且通过在代码中打印并行度,可以发现socketTextStream获取到的dataStream,并行度为1。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> socketStream = env.socketTextStream("localhost", 8888);
// 后续进行单词计数等处理操作
env.execute();

三、自定义Source

Flink提供了多种自定义Source的接口和类:

  1. SourceFunction:非并行数据源(并行度只能 = 1)接口。
  2. RichSourceFunction:多功能非并行数据源(并行度只能 = 1)类。
  3. ParallelSourceFunction:并行数据源(并行度能够 >= 1)接口。
  4. RichParallelSourceFunction:多功能并行数据源(并行度能够 >= 1)类,建议使用。这里的“Rich”字样代表富有,意味着在编程中可以调用更多的方法,功能更加全面。例如通过ParallelSourceFunction创建可并行Source,如果代码换成ParallelSourceFunction,每次生成12个数据(假设是12核数的环境),可以充分利用多核并行处理能力。

四、Kafka Source

Kafka作为常用的消息队列系统,在Flink中可以作为重要的数据来源。

  1. Kafka相关操作复习
    • Zookeeper启动
[root@bigdata01 app]# zk.sh start
---------- bigdata01 ----------
Starting zookeeper... STARTED
---------- bigdata02 ----------
Starting zookeeper... STARTED
---------- bigdata03 ----------
Starting zookeeper... STARTED
[root@bigdata01 app]# zk.sh status
---------- bigdata01 ----------
Mode: follower
---------- bigdata02 ----------
Mode: leader
---------- bigdata03 ----------
Mode: follower
  • Kafka启动kf.sh start
  • Kafka可视化界面(选做)./kafkaUI.sh start,可通过http://bigdata01:8889访问。
  • 创建Topic:可以通过界面创建,也可以通过命令创建,如bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic first
  • 控制台生产者与消费者
    • 控制台生产者:bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first
    • 控制台消费者:bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first
  1. Flink使用Kafka Source代码示例
package com.bigdata.day02;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        // 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
        // 返回true的数据就保留下来,返回false直接丢弃
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String word) throws Exception {
                // 查看单词中是否包含success字样
                return word.contains("success");
            }
        }).print();

        env.execute();
    }
}

五、总结

在Flink任务开发中,Source的选择与正确使用是构建数据处理管道的第一步。无论是本地集合、文件、网络套接字还是强大的Kafka作为数据源,都有其各自的特点与适用场景。开发者需要根据实际业务需求、数据来源特点以及系统架构等因素综合考虑,选择最合适的Source,并合理设置相关参数如并行度等,以确保数据能够高效、准确地流入Flink任务进行后续处理。同时,对于自定义Source的了解也为处理特殊数据源或定制化数据输入需求提供了可能,进一步拓展了Flink在大数据处理领域的应用灵活性。


原文地址:https://blog.csdn.net/qq_68076599/article/details/143991446

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!