自学内容网 自学内容网

如何基于 Apache SeaTunnel 同步数据到 Iceberg

概述

Apache SeaTunnel

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的开源数据集成平台, 支持spark、flink 及自研 Zeta 引擎,有庞大的用户社群.

Apache Iceberg

Apache Iceberg 是一个开源的表格格式,它旨在改善大数据生态系统中复杂的数据湖管理。作为Apache软件基金会的一部分,Iceberg专为提供更强大、更灵活的数据湖表格管理功能而设计,它通过提供一种更加高效和可靠的方式来处理大规模数据集,从而解决了传统数据湖在数据可靠性、性能和可维护性方面的挑战。

主要特点
  1. 模式演变和兼容性:Iceberg支持模式的演变,同时保证了向前和向后的兼容性。这使得在不破坏已有数据的情况下添加、删除、更新字段成为可能。
  2. 隐藏分区:分区信息作为表模式的一部分进行存储,这消除了需要手动管理分区目录的复杂性。分区对查询透明,即可进行常规查询而无需指定具体的分区。
  3. 多计算引擎支持:Iceberg可以与现代计算引擎无缝集成,包括Apache Spark、Apache Flink、PrestoDB 和 Trino等。同一数据集可以被多个引擎并发访问且保持一致性。
  4. 存储引擎支持: HDFS / S3
  5. 原子操作:Iceberg支持原子性写入操作。这意味着表更新要么全部成功,要么全部失败,确保了数据的一致性。
  6. 快照管理:支持表的快照功能,允许用户回滚到历史版本,以及进行增量读取操作。这对于数据恢复和审计尤为重要。
  7. 高效读写:通过提供文件层面的元数据,使得读写操作可以更高效地进行。该功能减少了需要扫描的数据量,改善了查询性能。
使用场景
  • 数据湖构建和管理:对于需要构建和管理大型数据湖的企业和组织,Apache Iceberg提供了一个高效、可扩展且易于管理的解决方案。
  • 多计算引擎环境:在使用多个计算引擎进行数据处理的环境中,Iceberg能够提供一致的数据视图和并发控制。
  • 数据科学和分析:提供了更强大且灵活的数据组织方式,使得进行复杂分析和数据科学项目更加容易。

SeaTunnel Iceberg sink

介绍

Apache SeaTunnel connector-Iceberg 是专门为Iceberg引擎开发的数据同步组件, 主要为了方便SeaTunnel 用户能更加友好的使用Iceberg来构建企业级数据湖仓

Iceberg sink 特性

  • 支持数据批量数据写入
  • cdc模式下的数据同步
  • 支持配置自动建表
  • 支持schema evolution
  • 支持指定分区键
  • 支持数据提交到指定的branch

Sink参数配置

NameTypeRequiredDefaultDescription
catalog_namestringyesdefault用户指定的目录名称。默认为 default。
namespacestringyesdefaultIceberg 数据库名称。默认为 default
tablestringyes-Iceberg 表名称。
iceberg.catalog.configmapno-指定用于初始化 Iceberg 目录的属性,具体配置参考:Iceberg Catalog Properties
hadoop.configmapno-指定 Hadoop 配置的属性,具体配置参考: Hadoop Configuration
iceberg.hadoop-conf-pathstringno-指定加载 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件的路径。
case_sensitivebooleannofalse控制是否以区分大小写的方式匹配 schema。
iceberg.table.write-propsmapno-传递给 Iceberg 写入器初始化的属性,这些属性具有优先权,可以在 Iceberg Write Properties 找到具体参数。
iceberg.table.auto-create-propsmapno-Iceberg 在自动创建表时指定的配置, 具体参照: Table Behavior Properties
iceberg.table.schema-evolution-enabledbooleannofalse将其设置为 true 可以使 Iceberg 表在同步过程中支持模式演变。目前仅支持添加字段 和 部分类型变更
iceberg.table.primary-keysstringno-表的主键配置,多个主键用","分割 ,与 "iceberg.table.upsert-mode-enabled" 一起使用,用于同主键数据的增量更新
iceberg.table.upsert-mode-enabledbooleannofalse将其设置为 true 以启用 upsert 模式,默认为 false, 用于 Iceberg 中数据的增量更新
iceberg.table.partition-keysstringno-创建表时指定的分区字段,多个分区字段使用","分隔。
iceberg.table.commit-branchstringno-指定数据提交的分支

同步模式

批处理

  • 批模式数据导入, append模式 ,不进行数据的增量更新
  • 支持 flink , spark ,zeta 引擎
env {
  parallelism = 1
  job.mode = "BATCH"

  # You can set spark configuration here
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}

source {
  FakeSource {
    row.num = 100
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    result_table_name = "fake"
  }
}

transform {
}

sink {
  Iceberg {
    catalog_name="seatunnel_test"
    iceberg.catalog.config={
      "type"="hadoop"
      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
    }
    namespace="seatunnel_namespace"
    table="iceberg_sink_table"
    iceberg.table.write-props={
      write.format.default="parquet"
      write.target-file-size-bytes=10
    }
    iceberg.table.partition-keys="c_timestamp"
    case_sensitive=true
  }
}

流写入(CDC)

  • 配置mysql cdc 进行数据的增量采集
  • Sink 指定 iceberg.table.primary-keys 和 iceberg.table.upsert-mode-enabled=true 进行数据增量写入
  • 配置 iceberg.table.schema-evolution-enabled=true 支持 schema 的演进(当前仅支持增加字段和部分类型变更)
  • 支持 flink / zeta 引擎的数据同步,不支持 spark
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }
    

source { MySQL-CDC { result_table_name="customer_result_table" catalog { factory = Mysql } debezium = { # include ddl "include.schema.changes" = true } database-names=["mysql_cdc"] table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] format=DEFAULT username = "st_user" password = "seatunnel" base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" } }

transform { }

sink { Iceberg { catalog_name="seatunnel_test" iceberg.catalog.config={ "type"="hadoop" "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-cdc-sink/" } namespace="seatunnel_namespace" table="iceberg_sink_table" iceberg.table.write-props={ write.format.default="parquet" write.target-file-size-bytes=10 } iceberg.table.primary-keys="id" iceberg.table.partition-keys="f_datetime" iceberg.table.upsert-mode-enabled=true iceberg.table.schema-evolution-enabled=true case_sensitive=true } }

```

总结

基于Apache SeaTunnel来构建数据湖项目, 我们可以直接引用 SeaTunnel 强大的组件生态,不用独立构造新的项目来实现业务需求,同时Apache SeaTunnel的标准的架构设计也为熟悉开源的朋友提供了快速独立扩展的机会,可以在此基础上快速扩展自己的需求,做出符合自己业务需要的组件, 也欢迎大家试用 Iceberg-connect , 希望能帮大家真正解决实际生产场景中遇到的问题,

也希望大家能积极反馈使用中的问题,并贡献场景,大家共同来解决,并促进 Iceberg-connect 组件的完善, 一起共创数据开发的新场景.

本文由 白鲸开源科技 提供发布支持!


原文地址:https://blog.csdn.net/weixin_54625990/article/details/140527756

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!