使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程
使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程
实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。
- 1. 从源数据库(MySQL和Oracle)实时抽取数据。
- 2. 对数据进行清洗和转换。
- 3. 将转换后的数据写入目标数据库(MySQL)。
我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。
环境准备
- 确保MySQL和Oracle数据库运行**,并创建相应的表。
- 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。
第一步:创建源和目标数据库表
假设我们有以下三个表:
- source_mysql_table(MySQL中的源表)
- source_oracle_table(Oracle中的源表)
- target_table(目标MySQL表)
MySQL源表
CREATE DATABASE source_mysql_db;
USE source_mysql_db;
CREATE TABLE source_mysql_table (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
action VARCHAR(255) NOT NULL,
timestamp VARCHAR(255) NOT NULL
);
Oracle源表
CREATE TABLE source_oracle_table (
id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,
user_id VARCHAR2(255) NOT NULL,
action VARCHAR2(255) NOT NULL,
timestamp VARCHAR2(255) NOT NULL,
PRIMARY KEY (id)
);
目标MySQL表
CREATE DATABASE target_db;
USE target_db;
CREATE TABLE target_table (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
action VARCHAR(255) NOT NULL,
timestamp VARCHAR(255) NOT NULL
);
第二步:添加项目依赖
在pom.xml中添加Flink、MySQL和Oracle相关的依赖:
<dependencies>
<!-- Spring Boot dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<!-- MySQL JDBC driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<!-- Oracle JDBC driver -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>19.8.0.0</version>
</dependency>
</dependencies>
第三步:编写Flink ETL任务
创建一个Flink任务类来实现ETL逻辑。
创建一个POJO类表示数据结构
package com.example.flink;
public class UserAction {
private int id;
private String userId;
private String action;
private String timestamp;
// Getters and setters
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
}
编写Flink任务类
package com.example.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@Component
public class FlinkETLJob implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从MySQL读取数据
DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());
// 从Oracle读取数据
DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());
// 合并两个数据流
DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);
// 清洗和转换数据
DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {
@Override
public UserAction map(UserAction value) throws Exception {
// 进行清洗和转换
value.setAction(value.getAction().toUpperCase());
return value;
}
});
// 将数据写入目标MySQL数据库
transformedStream.addSink(new MySQLSink());
// 执行任务
env.execute("Flink ETL Job");
}
public static class MySQLSource implements SourceFunction<UserAction> {
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
private static final String JDBC_USER = "source_user";
private static final String JDBC_PASSWORD = "source_password";
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<UserAction> ctx) throws Exception {
try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
while (isRunning) {
String sql = "SELECT * FROM source_mysql_table";
try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
UserAction userAction = new UserAction();
userAction.setId(resultSet.getInt("id"));
userAction.setUserId(resultSet.getString("user_id"));
userAction.setAction(resultSet.getString("action"));
userAction.setTimestamp(resultSet.getString("timestamp"));
ctx.collect(userAction);
}
}
Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static class OracleSource implements SourceFunction<UserAction> {
private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";
private static final String JDBC_USER = "source_user";
private static final String JDBC_PASSWORD = "source_password";
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<UserAction> ctx) throws Exception {
try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
while (isRunning) {
String sql = "SELECT * FROM source_oracle_table";
try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
UserAction userAction = new UserAction();
userAction.setId(resultSet.getInt("id"));
userAction.setUserId(resultSet.getString("user_id"));
userAction.setAction(resultSet.getString("action"));
userAction.setTimestamp(resultSet.getString("timestamp"));
ctx.collect(userAction);
}
}
Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";
private static final String JDBC_USER = "target_user";
private static final String JDBC_PASSWORD = "target_password";
private transient Connection connection;
private transient PreparedStatement statement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
statement = connection.prepareStatement(sql);
}
@Override
public void flatMap(UserAction value, Collector<Void> out) throws Exception {
statement.setString(1, value.getUserId());
statement.setString(2, value.getAction());
statement.setString(3, value.getTimestamp());
statement.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
}
}
}
第四步:配置Spring Boot
在application.properties中添加必要的配置:
# Spring Boot configuration
server.port=8080
第五步:运行和测试
- 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
- 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
- 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。
总结
通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。
原文地址:https://blog.csdn.net/qq_38411796/article/details/139770741
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!