自学内容网 自学内容网

Flink SQL 实战:从基础开发到 Kafka 与 MySQL 交互

目录

一、Flink SQL 开发步骤概述

二、简单 Demo 演示剖析

FlinkSQL-API :

三、Flink SQL - Kafka To Kafka 实战

四、Flink SQL - Kafka To MySQL 深度解析

 五、Flink SQL - Read MySQL 深度解析

六、总结


        在大数据处理领域,Apache Flink 凭借其强大的流批一体处理能力备受青睐,而 Flink SQL 更是为开发者提供了高效便捷的数据处理方式,能以类 SQL 的语法轻松应对复杂的数据场景。今天,就让我们深入探究 Flink SQL 的开发步骤、核心概念以及常见的使用案例,包括与 Kafka 和 MySQL 的联动操作。

一、Flink SQL 开发步骤概述

添加依赖

        要开启 Flink SQL 之旅,第一步便是添加合适的依赖。在 Flink 生态中,涉及到 DataStream 与表相互转换,有两种常见方案(此处暂不详述具体方案内容,后续结合代码深入讲解),不同方案对应不同的依赖需求与处理逻辑,这是搭建 Flink SQL 项目架构基础。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.13.6</version>
  <scope>provided</scope>
</dependency>

DataStream 与表的转换

DataStream -> 表

        存在多种实现途径,两种方案各有优劣与适用场景,关乎后续数据处理能否高效进行。比如在面对不同的数据结构、实时性要求时,灵活选择合适方案能减少不必要的性能损耗与代码复杂度。

第一种方案:

第二种方案:

表 -> DataStream

        此转换也需依据业务需求,考虑数据更新特性来抉择合适的转换方法,像 toAppendStream 与 toRetractStream 方法就有着截然不同的功能侧重。

查询操作

        支持 Table 风格(DSL 风格)和 SQL 风格查询。SQL 风格对于熟悉传统数据库 SQL 语法开发者来说上手极快,能凭借过往经验快速编写查询逻辑;而 Table 风格(DSL 风格)在与 Flink 生态深度融合上更具优势,可利用其提供的函数、算子精细控制数据处理流程,实现复杂业务逻辑。

Table风格/DSL风格

SQL风格

二、简单 Demo 演示剖析

        在实际演示中,我们会发现 DataStream 里若为 Row 类型,打印格式由 Row 类 toString 方法决定,并且输出标识有特殊含义,像 “+I” 表示新增数据。最初编写代码时,使用 toAppendStream 方法可能遇到分组操作受限问题,因为它只适用于单纯生成新计算结果且不修改老结果场景。当业务需求涉及对已有计算结果更新时,就得切换到 toRetractStream 方法,其返回 DataStream<Tuple2<Boolean, Row>> 类型,其中布尔值为 true 代表新增、更新,false 代表遗弃,对应输出标识分别为 “+U”(更新后)、“-U”(更新前),这样精细的状态标识助力开发者清晰把控数据动态变化。

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class Demo06FlinkSQL {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8899);

        // 需要将DataStream变为 Table
        tEnv.createTemporaryView("table1",streamSource);
        //3. transformation-数据处理转换
        Table table = tEnv.sqlQuery("select * from table1");
        //table.printSchema();  这个是打印表结构的,不是打印表数据的

        //4. 要想打印表中的数据是没办法的,需要将table转换为DataStream
        DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);
        appendStream.print();


        //5. execute-执行
        env.execute();
    }
}

        因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。

因为我们经常编写flinksql,所以创建一个模板:


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;                          
/**
 @基本功能:
 @program:${PROJECT_NAME}
 @author: 
 @create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}
**/
public class ${NAME} {

    public static void main(String[] args) throws Exception {

    //1. env-准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // 获取tableEnv对象
   // 通过env 获取一个table 环境
   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    //2. 创建表对象
    //3. 编写sql语句
    //4. 将Table变为stream流
   
    
    
   
    
     //5. execute-执行
    env.execute();
  }
}

根据这个可以做一个单词统计的案例:

第一版代码:

package com.bigdata.day08;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 
 * @create:2024-11-28 10:04:27
 *
 *     输入的数据  hello,5
 *               hello,10        结果是 hello,15
 **/
public class _01FlinkSQLWordCount {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);

        // 将数据进行转换
        DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] arr = s.split(",");
                String word = arr[0];
                int num = Integer.valueOf(arr[1]);
                return Tuple2.of(word, num);
            }
        });

        // 一个单词是一列    默认是 f0
        tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));
        //3. 进行sql 查询
        Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");
        //4. sink-数据输出
        DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
        resultStream.print();

        //5. execute-执行
        env.execute();
    }
}

报错: 

Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:395)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:186)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:354)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:343)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)

解决方案:

//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);

修改为:

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);

第二版代码:

package com.bigdata.day08;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author:
 * @create:2024-11-28 10:04:27
 *
 *     输入的数据  hello,5
 *               hello,10        结果是 hello,15
 **/
public class _01FlinkSQLWordCount {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);

        // 将数据进行转换
        DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] arr = s.split(",");
                String word = arr[0];
                int num = Integer.valueOf(arr[1]);
                return Tuple2.of(word, num);
            }
        });

        // 一个单词是一列    默认是 f0
        tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));
        //3. 进行sql 查询
        Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");
        //4. sink-数据输出
        // 这个toAppendStream ,是新增的操作,不能用于分组
        //DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
        // toRetractStream  使用这个带有缩进功能的方法,可以运行group by 等sql
        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);
        retractStream.print();

        //5. execute-执行
        env.execute();
    }
}

toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。

toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃

+ I 表示新增

-U 更新前

+U 更新后

FlinkSQL-API

需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计

package com.bigdata.day07;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能: 使用两种方式对wordcount进行统计
 * @program:FlinkDemo
 * @author: 
 * @create:2024-11-29 14:43:37
 **/
public class Demo08WordCount {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WC{
        private String word;
        private int num;
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // zhangsan,3
        //TODO 2.source/准备表View/Table
        DataStream<WC> dataStream = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );
        //dataStream->View和Table
        tenv.createTemporaryView("table1",dataStream, $("word"), $("num"));

        //2. 创建表对象
        //3. 编写sql语句
        Table resultTable1 = tenv.sqlQuery("select word,sum(num) as num from table1 group by word");
        //4. 将Table变为stream流
        DataStream<Tuple2<Boolean, WC>> resultDS1 = tenv.toRetractStream(resultTable1, WC.class);
        //resultDS1.print();
        // 第二种写法,使用DSL写法
        // 需要一个Table对象
        Table table = tenv.fromDataStream(dataStream, $("word"), $("num"));

        //Table resultTable2 = table.groupBy($("word"))
        //        .select($("word"), $("num").sum().as("num")).filter($("num").isEqual(2));
        Table resultTable2 = table.groupBy($("word"))
               .select($("word"), $("num").sum().as("num")).as("word","num");

        // select word,sum(num) as num from xxx group by word;
        // select word,sum(num) as num from xxx group by word having num ==2
        // result的结果,没有办法映射为WC实体,我们可以使用Row 对象,或者在select 后面,添加as,给字段起别名,假如添加了filter,可以映射为WC,无需添加as
        DataStream<Tuple2<Boolean, WC>> resultDS2 = tenv.toRetractStream(resultTable2, WC.class);
        resultDS2.print();


        //5. execute-执行
        env.execute();
    }
}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:473)

假如遇到以上错误,请检查wc实体类中,是否有无参构造方法。

        以上代码,作为了解,DataStream需要变为Table,计算结果还要变为DataStream,过于麻烦了。

三、Flink SQL - Kafka To Kafka 实战

需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到Kafka的topic2

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}

代码实现

//nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/

{"user_id": "1", "page_id":"1", "status": "success"}

CREATE TABLE table1 (
  `user_id` int,
  `page_id` int,
  `status` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic1',
  'properties.bootstrap.servers' = 'bigdata01:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)

CREATE TABLE table2 (
  `user_id` int,
  `page_id` int,
  `status` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic2',
  'properties.bootstrap.servers' = 'bigdata01:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)

编写sql即可。
insert into table2  select * from table1 where status='success'

package com.bigdata.day08;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 
 * @create:2024-11-29 11:00:51
 **/
public class _02KafkaConnectorDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 如果是建表语句:executeSql  这个返回值是TableResult
        // 如果是查询语句:sqlQuery    这个返回的是Table (有用)
        // 新建一个表,用于存储 kafka消息
        TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic1',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        // 新建一个表,用于存储kafka中的topic2中的数据
        tEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic2',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'format' = 'json'\n" +
                ")");

        tEnv.executeSql("insert into table2 select * from table1 where status ='success'");

        // 以上代码已经写完了,下面是两个步骤分开的写法
        //TODO 3.transformation/查询
        // Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");
        //输出到Kafka    DDL
        // tEnv.executeSql("insert into table2 select * from " + result);


        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出
        //5. execute-执行
        // env.execute();
    }
}

注意:如果最后一句写了:

env.execute();

而且以上写法可以将最后两步简化为:
tenv.executeSql("insert into table2 select * from table1 where status = 'success' ")

其实会进行报错,这句话其实已经没有用处了,但是这个错误不影响最终的结果。

将最后一句env.execute(); 删除即可。

假如报以下错误:说明没有导入json的包

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

Available factory identifiers are:

raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751)

需要导入这个包:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.13.6</version>
</dependency>

这个jar包可以在此处找到:

测试一下:

先启动生产者,

[root@bigdata01 ~]# kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1
>{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}

再启动消费者:

[root@bigdata01 ~]# kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic topic2
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}

从上面可以看到,生产者向topic1中发送数据,topic2中只有success数据。

四、Flink SQL - Kafka To MySQL 深度解析

需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到MySQL

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}

在mysql中需要有一个数据库test1 ,需要由一个表 t_success

create table t_success
(
    id      int auto_increment,
    user_id int         null,
    page_id int         null,
    status  varchar(20) null,
    constraint t_success_pk
        primary key (id)
);

代码实现

package com.bigdata.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 
 * @create:2024-11-29 11:00:51
 **/
public class _02KafkaToMySQLDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 如果是建表语句:executeSql  这个返回值是TableResult
        // 如果是查询语句:sqlQuery    这个返回的是Table (有用)
        // 新建一个表,用于存储 kafka消息
        TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'topic1',\n" +
                "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        // 新建一个表,用于存储kafka中的topic2中的数据
        tEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://bigdata01:3306/test1?useUnicode=true&characterEncoding=utf8',\n" +
                "    'table-name' = 't_success', \n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456'\n" +
                ")");

        tEnv.executeSql("insert into table2 select * from table1 where status ='success'");

        // 以上代码已经写完了,下面是两个步骤分开的写法
        //TODO 3.transformation/查询
        // Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");
        //输出到Kafka    DDL
        // tEnv.executeSql("insert into table2 select * from " + result);

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出
        //5. execute-执行
        // env.execute();
    }
}
//1.准备主题
//2.启动程序
//3.发送数据都topic1
//kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1
//4.观察table2

 五、Flink SQL - Read MySQL 深度解析

代码实现

package com.bigdata.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Program: FlinkDemo
 * @Package: com.bigdata.sql
 * @Class: _05Kafka2MySQL
 * @Author: 
 * @Create: 2024-11-29  18:31:17
 * @Description(说明):
 * @Version(版本号): 1.0
 */

public class _06ReadMySQL {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2. 创建表对象

        tEnv.executeSql("CREATE TABLE table2 (\n" +
                "  `user_id` int,\n" +
                "  `page_id` int,\n" +
                "  `status` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/flink?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
                "  'table-name' = 't_success', \n" +
                "  'username' = 'root',\n" +
                "  'password' = '020403crk'\n" +
                ")");

        //4. 将Table变为stream流
        Table table = tEnv.sqlQuery("select * from table2");
        table.printSchema();
        tEnv.toAppendStream(table, Row.class).print();

        //5. execute-执行
        env.execute();
    }
}

六、总结

        通过对 Flink SQL 基础开发步骤梳理、简单 Demo 原理深挖以及与 Kafka、MySQL 典型组合场景实战剖析,我们领略其强大与便捷。从应对简单数据转换、复杂状态更新,到打通 Kafka 消息队列与 MySQL 数据库数据通道,Flink SQL 都展现出卓越能力。在大数据浪潮下,熟练掌握 Flink SQL 这些用法,无论是实时数据处理、数据分析还是数据持久化存储场景,都能助开发者乘风破浪,高效构建稳定、高性能数据处理应用。


原文地址:https://blog.csdn.net/weixin_64726356/article/details/144131376

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!