如何基于 Apache SeaTunnel 同步数据到 Iceberg
概述
Apache SeaTunnel
Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的开源数据集成平台, 支持spark、flink 及自研 Zeta 引擎,有庞大的用户社群.
Apache Iceberg
Apache Iceberg 是一个开源的表格格式,它旨在改善大数据生态系统中复杂的数据湖管理。作为Apache软件基金会的一部分,Iceberg专为提供更强大、更灵活的数据湖表格管理功能而设计,它通过提供一种更加高效和可靠的方式来处理大规模数据集,从而解决了传统数据湖在数据可靠性、性能和可维护性方面的挑战。
主要特点
- 模式演变和兼容性:Iceberg支持模式的演变,同时保证了向前和向后的兼容性。这使得在不破坏已有数据的情况下添加、删除、更新字段成为可能。
- 隐藏分区:分区信息作为表模式的一部分进行存储,这消除了需要手动管理分区目录的复杂性。分区对查询透明,即可进行常规查询而无需指定具体的分区。
- 多计算引擎支持:Iceberg可以与现代计算引擎无缝集成,包括Apache Spark、Apache Flink、PrestoDB 和 Trino等。同一数据集可以被多个引擎并发访问且保持一致性。
- 存储引擎支持: HDFS / S3
- 原子操作:Iceberg支持原子性写入操作。这意味着表更新要么全部成功,要么全部失败,确保了数据的一致性。
- 快照管理:支持表的快照功能,允许用户回滚到历史版本,以及进行增量读取操作。这对于数据恢复和审计尤为重要。
- 高效读写:通过提供文件层面的元数据,使得读写操作可以更高效地进行。该功能减少了需要扫描的数据量,改善了查询性能。
使用场景
- 数据湖构建和管理:对于需要构建和管理大型数据湖的企业和组织,Apache Iceberg提供了一个高效、可扩展且易于管理的解决方案。
- 多计算引擎环境:在使用多个计算引擎进行数据处理的环境中,Iceberg能够提供一致的数据视图和并发控制。
- 数据科学和分析:提供了更强大且灵活的数据组织方式,使得进行复杂分析和数据科学项目更加容易。
SeaTunnel Iceberg sink
介绍
Apache SeaTunnel connector-Iceberg 是专门为Iceberg引擎开发的数据同步组件, 主要为了方便SeaTunnel 用户能更加友好的使用Iceberg来构建企业级数据湖仓
Iceberg sink 特性
- 支持数据批量数据写入
- cdc模式下的数据同步
- 支持配置自动建表
- 支持schema evolution
- 支持指定分区键
- 支持数据提交到指定的branch
Sink参数配置
Name | Type | Required | Default | Description |
---|---|---|---|---|
catalog_name | string | yes | default | 用户指定的目录名称。默认为 default。 |
namespace | string | yes | default | Iceberg 数据库名称。默认为 default |
table | string | yes | - | Iceberg 表名称。 |
iceberg.catalog.config | map | no | - | 指定用于初始化 Iceberg 目录的属性,具体配置参考:Iceberg Catalog Properties |
hadoop.config | map | no | - | 指定 Hadoop 配置的属性,具体配置参考: Hadoop Configuration |
iceberg.hadoop-conf-path | string | no | - | 指定加载 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件的路径。 |
case_sensitive | boolean | no | false | 控制是否以区分大小写的方式匹配 schema。 |
iceberg.table.write-props | map | no | - | 传递给 Iceberg 写入器初始化的属性,这些属性具有优先权,可以在 Iceberg Write Properties 找到具体参数。 |
iceberg.table.auto-create-props | map | no | - | Iceberg 在自动创建表时指定的配置, 具体参照: Table Behavior Properties |
iceberg.table.schema-evolution-enabled | boolean | no | false | 将其设置为 true 可以使 Iceberg 表在同步过程中支持模式演变。目前仅支持添加字段 和 部分类型变更 |
iceberg.table.primary-keys | string | no | - | 表的主键配置,多个主键用","分割 ,与 "iceberg.table.upsert-mode-enabled" 一起使用,用于同主键数据的增量更新 |
iceberg.table.upsert-mode-enabled | boolean | no | false | 将其设置为 true 以启用 upsert 模式,默认为 false, 用于 Iceberg 中数据的增量更新 |
iceberg.table.partition-keys | string | no | - | 创建表时指定的分区字段,多个分区字段使用","分隔。 |
iceberg.table.commit-branch | string | no | - | 指定数据提交的分支 |
同步模式
批处理
- 批模式数据导入, append模式 ,不进行数据的增量更新
- 支持 flink , spark ,zeta 引擎
env {
parallelism = 1
job.mode = "BATCH"
# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}
source {
FakeSource {
row.num = 100
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
result_table_name = "fake"
}
}
transform {
}
sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=10
}
iceberg.table.partition-keys="c_timestamp"
case_sensitive=true
}
}
流写入(CDC)
- 配置mysql cdc 进行数据的增量采集
- Sink 指定 iceberg.table.primary-keys 和 iceberg.table.upsert-mode-enabled=true 进行数据增量写入
- 配置 iceberg.table.schema-evolution-enabled=true 支持 schema 的演进(当前仅支持增加字段和部分类型变更)
- 支持 flink / zeta 引擎的数据同步,不支持 spark
env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 }
source { MySQL-CDC { result_table_name="customer_result_table" catalog { factory = Mysql } debezium = { # include ddl "include.schema.changes" = true } database-names=["mysql_cdc"] table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] format=DEFAULT username = "st_user" password = "seatunnel" base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" } }
transform { }
sink { Iceberg { catalog_name="seatunnel_test" iceberg.catalog.config={ "type"="hadoop" "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-cdc-sink/" } namespace="seatunnel_namespace" table="iceberg_sink_table" iceberg.table.write-props={ write.format.default="parquet" write.target-file-size-bytes=10 } iceberg.table.primary-keys="id" iceberg.table.partition-keys="f_datetime" iceberg.table.upsert-mode-enabled=true iceberg.table.schema-evolution-enabled=true case_sensitive=true } }
```
总结
基于Apache SeaTunnel来构建数据湖项目, 我们可以直接引用 SeaTunnel 强大的组件生态,不用独立构造新的项目来实现业务需求,同时Apache SeaTunnel的标准的架构设计也为熟悉开源的朋友提供了快速独立扩展的机会,可以在此基础上快速扩展自己的需求,做出符合自己业务需要的组件, 也欢迎大家试用 Iceberg-connect , 希望能帮大家真正解决实际生产场景中遇到的问题,
也希望大家能积极反馈使用中的问题,并贡献场景,大家共同来解决,并促进 Iceberg-connect 组件的完善, 一起共创数据开发的新场景.
本文由 白鲸开源科技 提供发布支持!
原文地址:https://blog.csdn.net/weixin_54625990/article/details/140527756
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!