Flink SQL
-
进入 JobManager 容器:
docker exec -it 21442d9ca797 /bin/bash
-
启动 Flink 的 SQL 客户端:
/opt/flink/bin/sql-client.sh embedded
-
尝试创建 Kafka 表:
在启动的 SQL 客户端中,尝试创建一个 Kafka 表,看看是否能够成功:
CREATE TABLE test_kafka_table ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test_topic', 'properties.bootstrap.servers' = '110.40.130.231:9092', 'format' = 'json' );
如果没有报错,说明 Kafka 连接器已成功加载。
以下是一个使用 Flink SQL 从 Kafka 读取数据、进行简单聚合计算、并将结果写入 MySQL 和 HDFS 的示例。这个示例假设你已经安装并配置好了 Flink、Kafka、MySQL 和 HDFS。
1. 从 Kafka 读取数据
首先,创建一个 Kafka 表来定义数据源。假设 Kafka 主题名为 user_behavior
,包含用户行为数据,每条消息格式为 JSON,包含字段 user_id
, item_id
, category_id
, behavior
, ts
(时间戳)。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime AS PROCTIME(), -- 添加处理时间列
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 设置水印,允许5秒延迟
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
2. 进行简单的聚合计算
接下来,对用户行为数据进行简单的聚合计算,例如按类别统计每分钟的行为次数。
CREATE VIEW behavior_count AS
SELECT
category_id,
TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,
COUNT(*) as behavior_count
FROM user_behavior
GROUP BY category_id, TUMBLE(ts, INTERVAL '1' MINUTE);
使用了 TUMBLE
函数来创建滚动窗口,按每分钟对数据进行分组,并计算每个类别的行为次数。
3. 将处理后的数据写入 MySQL
为了将上述聚合结果写入 MySQL,首先创建一个 MySQL 表。
CREATE TABLE behavior_summary (
category_id BIGINT,
window_start TIMESTAMP(3),
behavior_count BIGINT,
PRIMARY KEY (category_id, window_start) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'behavior_summary',
'username' = 'myuser',
'password' = 'mypassword'
);
然后,可以INSERT INTO
语句将数据插入到 MySQL 表中。
INSERT INTO behavior_summary
SELECT * FROM behavior_count;
4. 将处理后的数据写入 HDFS
如果想将数据写入 HDFS,先创建一个 HDFS 表。
CREATE TABLE behavior_summary_hdfs (
category_id BIGINT,
window_start TIMESTAMP(3),
behavior_count BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://localhost:9000/user/flink/behavior_summary',
'format' = 'csv'
);
接着,使用 INSERT INTO
语句将数据写入 HDFS。
INSERT INTO behavior_summary_hdfs
SELECT * FROM behavior_count;
总结
以上步骤展示了如何使用 Flink SQL 从 Kafka 读取数据、进行聚合计算,并将结果分别写入 MySQL 和 HDFS。这是一个基本的流程,根据实际需求,可以调整表结构、连接器配置以及 SQL 查询以适应不同的应用场景。
原文地址:https://blog.csdn.net/qq_41081716/article/details/143508790
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!