Flink SQL 实战:从基础开发到 Kafka 与 MySQL 交互
目录
三、Flink SQL - Kafka To Kafka 实战
四、Flink SQL - Kafka To MySQL 深度解析
在大数据处理领域,Apache Flink 凭借其强大的流批一体处理能力备受青睐,而 Flink SQL 更是为开发者提供了高效便捷的数据处理方式,能以类 SQL 的语法轻松应对复杂的数据场景。今天,就让我们深入探究 Flink SQL 的开发步骤、核心概念以及常见的使用案例,包括与 Kafka 和 MySQL 的联动操作。
一、Flink SQL 开发步骤概述
添加依赖:
要开启 Flink SQL 之旅,第一步便是添加合适的依赖。在 Flink 生态中,涉及到 DataStream 与表相互转换,有两种常见方案(此处暂不详述具体方案内容,后续结合代码深入讲解),不同方案对应不同的依赖需求与处理逻辑,这是搭建 Flink SQL 项目架构基础。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
DataStream 与表的转换
DataStream -> 表:
存在多种实现途径,两种方案各有优劣与适用场景,关乎后续数据处理能否高效进行。比如在面对不同的数据结构、实时性要求时,灵活选择合适方案能减少不必要的性能损耗与代码复杂度。
第一种方案:
第二种方案:
表 -> DataStream:
此转换也需依据业务需求,考虑数据更新特性来抉择合适的转换方法,像 toAppendStream
与 toRetractStream
方法就有着截然不同的功能侧重。
查询操作:
支持 Table 风格(DSL 风格)和 SQL 风格查询。SQL 风格对于熟悉传统数据库 SQL 语法开发者来说上手极快,能凭借过往经验快速编写查询逻辑;而 Table 风格(DSL 风格)在与 Flink 生态深度融合上更具优势,可利用其提供的函数、算子精细控制数据处理流程,实现复杂业务逻辑。
Table风格/DSL风格
SQL风格
二、简单 Demo 演示剖析
在实际演示中,我们会发现 DataStream 里若为 Row 类型,打印格式由 Row 类 toString 方法决定,并且输出标识有特殊含义,像 “+I” 表示新增数据。最初编写代码时,使用 toAppendStream
方法可能遇到分组操作受限问题,因为它只适用于单纯生成新计算结果且不修改老结果场景。当业务需求涉及对已有计算结果更新时,就得切换到 toRetractStream
方法,其返回 DataStream<Tuple2<Boolean, Row>>
类型,其中布尔值为 true 代表新增、更新,false 代表遗弃,对应输出标识分别为 “+U”(更新后)、“-U”(更新前),这样精细的状态标识助力开发者清晰把控数据动态变化。
package com.bigdata.day07;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class Demo06FlinkSQL {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 获取tableEnv对象
// 通过env 获取一个table 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2. source-加载数据
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8899);
// 需要将DataStream变为 Table
tEnv.createTemporaryView("table1",streamSource);
//3. transformation-数据处理转换
Table table = tEnv.sqlQuery("select * from table1");
//table.printSchema(); 这个是打印表结构的,不是打印表数据的
//4. 要想打印表中的数据是没办法的,需要将table转换为DataStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);
appendStream.print();
//5. execute-执行
env.execute();
}
}
因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。
因为我们经常编写flinksql,所以创建一个模板:
#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@基本功能:
@program:${PROJECT_NAME}
@author:
@create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}
**/
public class ${NAME} {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 获取tableEnv对象
// 通过env 获取一个table 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2. 创建表对象
//3. 编写sql语句
//4. 将Table变为stream流
//5. execute-执行
env.execute();
}
}
根据这个可以做一个单词统计的案例:
第一版代码:
package com.bigdata.day08;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* @基本功能:
* @program:FlinkDemo
* @author:
* @create:2024-11-28 10:04:27
*
* 输入的数据 hello,5
* hello,10 结果是 hello,15
**/
public class _01FlinkSQLWordCount {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 通过env 获取一个table 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2. source-加载数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);
// 将数据进行转换
DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] arr = s.split(",");
String word = arr[0];
int num = Integer.valueOf(arr[1]);
return Tuple2.of(word, num);
}
});
// 一个单词是一列 默认是 f0
tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));
//3. 进行sql 查询
Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");
//4. sink-数据输出
DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
resultStream.print();
//5. execute-执行
env.execute();
}
}
报错:
Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:395)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:186)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:354)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:343)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
解决方案:
//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
修改为:
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);
第二版代码:
package com.bigdata.day08;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* @基本功能:
* @program:FlinkDemo
* @author:
* @create:2024-11-28 10:04:27
*
* 输入的数据 hello,5
* hello,10 结果是 hello,15
**/
public class _01FlinkSQLWordCount {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 通过env 获取一个table 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2. source-加载数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);
// 将数据进行转换
DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] arr = s.split(",");
String word = arr[0];
int num = Integer.valueOf(arr[1]);
return Tuple2.of(word, num);
}
});
// 一个单词是一列 默认是 f0
tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));
//3. 进行sql 查询
Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");
//4. sink-数据输出
// 这个toAppendStream ,是新增的操作,不能用于分组
//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
// toRetractStream 使用这个带有缩进功能的方法,可以运行group by 等sql
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);
retractStream.print();
//5. execute-执行
env.execute();
}
}
toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。
toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃
+ I 表示新增
-U 更新前
+U 更新后
FlinkSQL-API :
需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计
package com.bigdata.day07;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* @基本功能: 使用两种方式对wordcount进行统计
* @program:FlinkDemo
* @author:
* @create:2024-11-29 14:43:37
**/
public class Demo08WordCount {
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class WC{
private String word;
private int num;
}
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 获取tableEnv对象
// 通过env 获取一个table 环境
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// zhangsan,3
//TODO 2.source/准备表View/Table
DataStream<WC> dataStream = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//dataStream->View和Table
tenv.createTemporaryView("table1",dataStream, $("word"), $("num"));
//2. 创建表对象
//3. 编写sql语句
Table resultTable1 = tenv.sqlQuery("select word,sum(num) as num from table1 group by word");
//4. 将Table变为stream流
DataStream<Tuple2<Boolean, WC>> resultDS1 = tenv.toRetractStream(resultTable1, WC.class);
//resultDS1.print();
// 第二种写法,使用DSL写法
// 需要一个Table对象
Table table = tenv.fromDataStream(dataStream, $("word"), $("num"));
//Table resultTable2 = table.groupBy($("word"))
// .select($("word"), $("num").sum().as("num")).filter($("num").isEqual(2));
Table resultTable2 = table.groupBy($("word"))
.select($("word"), $("num").sum().as("num")).as("word","num");
// select word,sum(num) as num from xxx group by word;
// select word,sum(num) as num from xxx group by word having num ==2
// result的结果,没有办法映射为WC实体,我们可以使用Row 对象,或者在select 后面,添加as,给字段起别名,假如添加了filter,可以映射为WC,无需添加as
DataStream<Tuple2<Boolean, WC>> resultDS2 = tenv.toRetractStream(resultTable2, WC.class);
resultDS2.print();
//5. execute-执行
env.execute();
}
}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:473)
假如遇到以上错误,请检查wc实体类中,是否有无参构造方法。
以上代码,作为了解,DataStream需要变为Table,计算结果还要变为DataStream,过于麻烦了。
三、Flink SQL - Kafka To Kafka 实战
需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到Kafka的topic2
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
代码实现
//nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
{"user_id": "1", "page_id":"1", "status": "success"}
CREATE TABLE table1 (
`user_id` int,
`page_id` int,
`status` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic1',
'properties.bootstrap.servers' = 'bigdata01:9092',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
CREATE TABLE table2 (
`user_id` int,
`page_id` int,
`status` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic2',
'properties.bootstrap.servers' = 'bigdata01:9092',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
编写sql即可。
insert into table2 select * from table1 where status='success'
package com.bigdata.day08;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @基本功能:
* @program:FlinkDemo
* @author:
* @create:2024-11-29 11:00:51
**/
public class _02KafkaConnectorDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 如果是建表语句:executeSql 这个返回值是TableResult
// 如果是查询语句:sqlQuery 这个返回的是Table (有用)
// 新建一个表,用于存储 kafka消息
TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +
" `user_id` int,\n" +
" `page_id` int,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'topic1',\n" +
" 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")");
// 新建一个表,用于存储kafka中的topic2中的数据
tEnv.executeSql("CREATE TABLE table2 (\n" +
" `user_id` int,\n" +
" `page_id` int,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'topic2',\n" +
" 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
" 'format' = 'json'\n" +
")");
tEnv.executeSql("insert into table2 select * from table1 where status ='success'");
// 以上代码已经写完了,下面是两个步骤分开的写法
//TODO 3.transformation/查询
// Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");
//输出到Kafka DDL
// tEnv.executeSql("insert into table2 select * from " + result);
//2. source-加载数据
//3. transformation-数据处理转换
//4. sink-数据输出
//5. execute-执行
// env.execute();
}
}
注意:如果最后一句写了:
env.execute();
而且以上写法可以将最后两步简化为:
tenv.executeSql("insert into table2 select * from table1 where status = 'success' ")
其实会进行报错,这句话其实已经没有用处了,但是这个错误不影响最终的结果。
将最后一句env.execute(); 删除即可。
假如报以下错误:说明没有导入json的包:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
raw
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751)
需要导入这个包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.6</version>
</dependency>
这个jar包可以在此处找到:
测试一下:
先启动生产者,
[root@bigdata01 ~]# kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1
>{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
再启动消费者:
[root@bigdata01 ~]# kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic topic2
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}
{"user_id":1,"page_id":1,"status":"success"}
从上面可以看到,生产者向topic1中发送数据,topic2中只有success数据。
四、Flink SQL - Kafka To MySQL 深度解析
需求: 从Kafka的topic1中消费数据并过滤出状态为success的数据再写入到MySQL
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
在mysql中需要有一个数据库test1 ,需要由一个表 t_success
create table t_success
(
id int auto_increment,
user_id int null,
page_id int null,
status varchar(20) null,
constraint t_success_pk
primary key (id)
);
代码实现
package com.bigdata.day08;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @基本功能:
* @program:FlinkDemo
* @author:
* @create:2024-11-29 11:00:51
**/
public class _02KafkaToMySQLDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 如果是建表语句:executeSql 这个返回值是TableResult
// 如果是查询语句:sqlQuery 这个返回的是Table (有用)
// 新建一个表,用于存储 kafka消息
TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +
" `user_id` int,\n" +
" `page_id` int,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'topic1',\n" +
" 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")");
// 新建一个表,用于存储kafka中的topic2中的数据
tEnv.executeSql("CREATE TABLE table2 (\n" +
" `user_id` int,\n" +
" `page_id` int,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://bigdata01:3306/test1?useUnicode=true&characterEncoding=utf8',\n" +
" 'table-name' = 't_success', \n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")");
tEnv.executeSql("insert into table2 select * from table1 where status ='success'");
// 以上代码已经写完了,下面是两个步骤分开的写法
//TODO 3.transformation/查询
// Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");
//输出到Kafka DDL
// tEnv.executeSql("insert into table2 select * from " + result);
//2. source-加载数据
//3. transformation-数据处理转换
//4. sink-数据输出
//5. execute-执行
// env.execute();
}
}
//1.准备主题
//2.启动程序
//3.发送数据都topic1
//kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1
//4.观察table2
五、Flink SQL - Read MySQL 深度解析
代码实现
package com.bigdata.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Program: FlinkDemo
* @Package: com.bigdata.sql
* @Class: _05Kafka2MySQL
* @Author:
* @Create: 2024-11-29 18:31:17
* @Description(说明):
* @Version(版本号): 1.0
*/
public class _06ReadMySQL {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 获取tableEnv对象
// 通过env 获取一个table 环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2. 创建表对象
tEnv.executeSql("CREATE TABLE table2 (\n" +
" `user_id` int,\n" +
" `page_id` int,\n" +
" `status` STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai',\n" +
" 'table-name' = 't_success', \n" +
" 'username' = 'root',\n" +
" 'password' = '020403crk'\n" +
")");
//4. 将Table变为stream流
Table table = tEnv.sqlQuery("select * from table2");
table.printSchema();
tEnv.toAppendStream(table, Row.class).print();
//5. execute-执行
env.execute();
}
}
六、总结
通过对 Flink SQL 基础开发步骤梳理、简单 Demo 原理深挖以及与 Kafka、MySQL 典型组合场景实战剖析,我们领略其强大与便捷。从应对简单数据转换、复杂状态更新,到打通 Kafka 消息队列与 MySQL 数据库数据通道,Flink SQL 都展现出卓越能力。在大数据浪潮下,熟练掌握 Flink SQL 这些用法,无论是实时数据处理、数据分析还是数据持久化存储场景,都能助开发者乘风破浪,高效构建稳定、高性能数据处理应用。
原文地址:https://blog.csdn.net/weixin_64726356/article/details/144131376
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!