自学内容网 自学内容网

008.精读《Apache Paimon Docs - Table w/o PK》

1. 引言

通过本文,上篇我们了解了Apache Paimon 主键表,本期我们将继续学习附加表Append Only Table) 我们将带领读者《 《Apache Paimon Docs - Table w/o PK》》 继续剖析 Paimon 的仅追加表相关知识。

通过本文你将了解到:

  1. Paimon 附加表相关的基本概念,了解什么是附加表,它在Paimon中扮演什么角色,以及它如何与主键表区分开来。
  2. 及其适用场景,探索附加表在实际应用中的多样化场景。
  3. 数据查询更新方式,从高效的数据合并策略到灵活的流式查询配置,以及如何通过索引和文件索引优化查询性能。

2. 基本概念

2.1 定义

如果一个表没有定义主键,那它就是一个附加表(Append Table)。与主键表相比,附加表无法直接接收变更日志,也不能直接通过 upsert 更新数据,只能接收附加数据。

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    -- 'target-file-size' = '256 MB',
    -- 'file.format' = 'parquet',
    -- 'file.compression' = 'zstd',
    -- 'file.compression.zstd-level' = '3'
);

2.2 使用场景

使用场景或优势说明
批量写入和批量读取类似于常规的 Hive 分区表,适用于大规模数据的批量处理。
友好的对象存储良好的兼容性和适应性,支持 S3、OSS 等对象存储。
时间穿越和回滚支持数据的时间旅行和回滚功能,方便数据的历史查询和恢复。
低成本的删除和更新在批量数据操作中,能够以较低的计算和资源成本进行删除和更新操作。
流式接收中的小文件自动合并在流式写入过程中,自动处理小文件合并,减少存储碎片。
队列形式的流式读写支持如队列般的流式读写操作,可以像消息队列一样处理数据。
高性能查询通过顺序和索引实现的高效查询性能。

批量写入和读取

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'target-file-size' = '256 MB',                     -- 设置目标文件大小
    'file.format' = 'parquet',                         -- 文件格式为 Parquet
    'file.compression' = 'zstd',                       -- 使用 ZSTD 压缩
    'file.compression.zstd-level' = '3'                -- 设置 ZSTD 压缩级别为 3
);

流式接收和小文件合并

CREATE TABLE my_stream_table (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data STRING
) WITH (
    'target-file-size' = '128 MB',                     -- 设置目标文件大小
    'file.format' = 'avro',                            -- 文件格式为 Avro
    'file.compression' = 'snappy',                     -- 使用 Snappy 压缩
    'streaming.min-batch-interval' = '5 min'           -- 设置流处理最小批处理时间间隔为 5 分钟
);

具有以下的优点:

功能特性技术优势实现
对象存储友好良好的兼容性和适应性,支持 S3、OSS 等对象存储。通过接入主流对象存储服务,优化读写性能和兼容性,特别是大规模数据存储和处理场景下。
时间穿越和回滚支持数据的时间旅行和回滚功能,方便数据的历史查询和恢复。利用快照和元数据管理,实现任意时间点的数据查询和回滚能力。
低成本的删除和更新在批量数据操作中,能够以较低的计算和资源成本进行删除和更新操作。通过高效的数据合并和变更处理机制,优化批量操作中的资源消耗。
小文件合并在流式写入过程中,自动处理小文件合并,减少存储碎片。在流式写入过程中,使用异步任务定期合并小文件,确保合理的文件大小和存储效率。
高性能查询通过顺序和索引实现的高效查询性能。通过索引构建和数据排序,提升查询的响应速度和资源利用效率。

3. 流式处理

附加表(Append Table)可以通过 Flink 进行非常灵活的流式写入,并可以像队列一样通过 Flink 进行读取。唯一的区别是其延迟为分钟级别,但其优势在于非常低的成本以及能够进行过滤和投影下推。

3.1 自动小文件合并

在流式写入作业中,如果没有定义分桶(bucket),写入器不会进行压缩;相反,将使用压缩协调器(Compact Coordinator)扫描小文件并将压缩任务传递给压缩工作者(Compact Worker)。流式模式下,如果在 Flink 中运行插入 SQL,拓扑结构将如下所示:

Source -> Transformations -> Sink
        -> Compact Coordinator -> Compact Worker
  • 无反压:压缩任务不会引起反压。
  • 写入模式:如果设置 write-onlytrue,压缩协调器和压缩工作者将在拓扑中被移除。
  • Flink 流模式:自动压缩仅在 Flink 引擎的流模式下被支持。可以通过 Paimon 在 Flink 中启动压缩作业,并通过设置 write-only 禁用所有其他压缩。

3.2 流式查询

附加表可以像消息队列一样使用,进行流式查询,与主键表类似,有两个选项可以进行流式读取:

  1. 默认模式:流式读取在首次启动时生成表的最新快照,并继续读取最新的增量记录。
  2. 增量模式:可以指定 scan.modescan.snapshot-idscan.timestamp-millisscan.file-creation-time-millis 进行增量读取。

类似 Flink-Kafka,默认情况下不保证顺序。如果数据需要某种顺序,也需要考虑定义桶键(bucket-key),请参考分桶附加(Bucketed Append)部分。

流式写入和自动小文件合并

CREATE TABLE my_stream_table (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data STRING
) WITH (
    'target-file-size' = '128 MB',                     -- 设置目标文件大小
    'file.format' = 'avro',                            -- 文件格式为 Avro
    'file.compression' = 'snappy',                     -- 使用 Snappy 压缩
    'streaming.min-batch-interval' = '5 min'           -- 设置流处理最小批处理时间间隔为 5 分钟
);

在流式写入过程中,配置 Compact CoordinatorCompact Worker 以确保小文件自动合并。

流式查询配置(默认模式)

SET 'scan.startup.mode' = 'latest-offset';            -- 设置流式读取从最新的快照开始

流式查询配置(增量模式)

SET 'scan.mode' = 'incremental';                      -- 设置流式读取为增量模式
SET 'scan.snapshot-id' = '1234567890';                -- 可选:指定从特定快照 ID 开始
SET 'scan.timestamp-millis' = '1627849923000';        -- 可选:指定从特定时间戳(毫秒)开始

流式查询配置(带顺序要求)

CREATE TABLE ordered_stream_table (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data STRING
) WITH (
    'target-file-size' = '128 MB',
    'file.format' = 'parquet',
    'file.compression' = 'zstd',
    'streaming.min-batch-interval' = '5 min',
    'bucket-key' = 'event_time'                        -- 设置桶键(bucket-key)以确保数据按照时间顺序
);

技术优势及其实现

功能特性技术优势实现
流式写入通过灵活的配置选项,实现分钟级别低延迟的流式写入,并支持过滤和投影下推,提升查询效率。通过灵活的配置选项,优化数据流的写入路径,减少延迟,并通过下推操作提升查询效率。
自动小文件合并在流式处理过程中,动态管理文件大小,减少存储碎片,提高存储效率。使用动态文件管理策略,自动合并小文件,以优化存储空间和提高I/O效率。
流式读取支持从最新快照读取或增量读取,类似消息队列的使用,方便实时数据处理和分析。提供快照和增量读取功能,使得流式读取更加灵活,适用于实时数据处理场景。
顺序保证通过配置桶键,可以确保数据在需要顺序的情境下有序读取和写入,满足业务需求。通过桶键配置,实现数据的有序存储和检索,保证业务逻辑的顺序性。

4. 数据更新

4.1 查询

按顺序跳过数据

Paimon 默认在清单文件中记录每个字段的最大值和最小值。在查询时,根据查询的 WHERE 条件,通过清单中的统计信息进行文件过滤。如果过滤效果良好,查询时间可以从分钟级别加速到毫秒级别。

然而,数据分布并不总是能有效过滤,因此如果可以根据 WHERE 条件中的字段对数据进行排序,将会更高效。可以参考 Flink 的 COMPACT Action 或 COMPACT Procedure,以及 Spark 的 COMPACT Procedure。

-- 对数据进行排序以优化按顺序跳过数据的查询性能
ALTER TABLE my_table COMPACT BY (field_name);

按文件索引跳过数据

还可以使用文件索引,它将在读取端通过索引过滤文件。

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'file-index.bloom-filter.columns' = 'product_id',
    'file-index.bloom-filter.product_id.items' = '200'
);

定义 file-index.bloom-filter.columns 后,Paimon 将为每个文件创建相应的索引文件。如果索引文件太小,它将直接存储在清单中,否则将存储在数据文件的目录中。每个数据文件对应一个索引文件,该文件有独立的定义,可以包含不同类型的多列索引。

文件索引的应用场景

不同文件索引在不同场景下效率不同。例如:

  • 布隆过滤器(Bloom Filter):在点查找场景中可能加速查询。
  • 位图(Bitmap):可能消耗更多空间,但精度更高。

目前,文件索引仅支持附加表(Append-Only Table)。

布隆过滤器的配置

  • file-index.bloom-filter.columns:指定需要布隆过滤器索引的列。
  • file-index.bloom-filter.<column_name>.fpp:配置错误正率(False Positive Probability)。
  • file-index.bloom-filter.<column_name>.items:配置一个数据文件中预期的不同项目数量。

位图的配置

  • file-index.bitmap.columns:指定需要位图索引的列。

添加文件索引到现有表

如果你想在不重写的情况下添加文件索引,可以使用 rewrite_file_index 过程。在使用该过程之前,你应该在目标表中配置适当的配置。可以使用 ALTER 子句来配置 file-index.<filter-type>.columns

使用示例:添加文件索引到现有表

ALTER TABLE my_table
SET ('file-index.bloom-filter.columns' = 'product_id');

CALL rewrite_file_index('my_table');

4.2 更新

目前,仅 Spark SQL 支持 DELETE 和 UPDATE 操作,可以参考 Spark Write 的相关文档。

DELETE FROM my_table
WHERE currency = 'UNKNOWN';

更新模式

附加表(Append Table)有两种更新模式:

  1. COW(Copy on Write)

    • 机制:搜索命中的文件,然后重新写入每个文件以移除需要删除的数据。
    • 成本:这种操作成本高,因为每次删除或更新都需要重新写入整个文件。
  2. MOW(Merge on Write)

    • 机制:通过指定 'deletion-vectors.enabled' = 'true',启用删除向量模式(Deletion Vectors)。只标记对应文件的某些记录为删除,并写入删除文件,而不需要重新写入整个文件。
    • 优势:相比 COW 模式,MOW 模式的删除和更新成本更低,因为只需写入小的删除文件,而不需要重写全部数据文件。

在创建或更新表时,可以启用删除向量模式:

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'deletion-vectors.enabled' = 'true'
);

或在现有表上启用删除向量:

ALTER TABLE my_table
SET ('deletion-vectors.enabled' = 'true');

MOW 模式下的 DELETE 操作

DELETE FROM my_table
WHERE currency = 'UNKNOWN';

此操作将标记 currency 为 ‘UNKNOWN’ 的记录为删除,而不重写整个文件。

通过 Spark SQL 进行更新操作

val spark = SparkSession.builder()
    .appName("UpdateExample")
    .getOrCreate()

// 启用删除向量
spark.sql("ALTER TABLE my_table SET ('deletion-vectors.enabled' = 'true')")

// 执行 DELETE 操作
spark.sql("DELETE FROM my_table WHERE currency = 'UNKNOWN'")

// 执行 UPDATE 操作
spark.sql("UPDATE my_table SET price = price * 1.1 WHERE product_id = 1001")

4.3 分桶附加表

您可以定义 bucketbucket-key 以创建一个分桶附加表。在这种表中,不同桶内的数据是严格有序的,流式读取将按写入顺序准确地传输记录。这样可以优化数据处理和查询性能。

--创建分桶附加表
CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

一个普通的附加表的流式写读取没有严格的顺序保证,但是有些情况下需要定义一个类似于 Kafka 的键。

  • 每个分桶中的记录都是严格有序的:流式读取将按写入顺序准确地传输记录。无需配置特殊的设置,所有数据将按队列形式进入一个桶内。

分桶中的压缩(Compaction in Bucket)

默认情况下,Sink 节点会自动执行压缩以控制文件数量。以下选项控制压缩策略:

KeyDefaultTypeDescription
write-onlyfalseBoolean如果设置为 true,将跳过压缩和快照过期操作。此选项与专用压缩作业一起使用。
compaction.min.file-num5Integer对文件集 [f_0,…,f_N],满足 sum(size(f_i)) >= targetFileSize 的最小文件数量以触发附加表的压缩。避免压缩几乎满的文件,因为这不合算。
compaction.max.file-num5Integer对文件集 [f_0,…,f_N],即使 sum(size(f_i)) < targetFileSize,也触发压缩的最大文件数量。此值避免过多小文件积压,减慢性能。
full-compaction.delta-commits(none)Integer在 delta 提交后会不断触发全量压缩。

流式读取顺序(Streaming Read Order)

对于流式读取,记录按以下顺序生产:

  • 跨分区记录:如果 scan.plan-sort-partition 设置为 true,则首先生产分区值较小的记录。否则,先生产创建时间较早的分区的记录。
  • 同分区同桶记录:首先生产先写入的记录。
  • 同分区不同桶记录:不同桶由不同任务处理,不保证顺序。

水印定义(Watermark Definition)

CREATE TABLE t (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

启动有界流任务读取 Paimon 表

SELECT window_start, window_end, COUNT(`user`) 
FROM TABLE(
    TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
) 
GROUP BY window_start, window_end;

还可以启用 Flink 水印对齐,确保没有来源/分片/分区的水印前进太快:

KeyDefaultTypeDescription
scan.watermark.alignment.group(none)String要对齐水印的一组源。
scan.watermark.alignment.max-drift(none)Duration对齐水印的最大漂移,在此漂移前暂停从源/任务/分区消费。

有界流(Bounded Stream)

流式来源(Streaming Source)也可以是有界的,可以通过指定 scan.bounded.watermark 来定义有界流模式的结束条件。

--创建 Kafka 表和启动流式插入及读取作业
CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- 启动流式插入作业
INSERT INTO paimon_table 
SELECT * FROM kafka_table;

-- 启动有界流任务读取 Paimon 表
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

批处理(Batch)

分桶表可以在批处理查询中避免 shuffle,例如可以用以下 Spark SQL 读取 Paimon 表:

SET spark.sql.sources.v2.bucketing.enabled = true;

CREATE TABLE FACT_TABLE (
    order_id INT, 
    f1 STRING
) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');

CREATE TABLE DIM_TABLE (
    order_id INT, 
    f2 STRING
) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');

SELECT * 
FROM FACT_TABLE 
JOIN DIM_TABLE 
ON FACT_TABLE.order_id = DIM_TABLE.order_id;

通过设置 spark.sql.sources.v2.bucketing.enabled 为 true,Spark 将识别 V2 数据源报告的特定分布,并在必要时尝试避免 shuffle。如果两个表具有相同的分桶策略和相同数量的桶,昂贵的 join shuffle 操作将被避免。

5 总结

本文详细介绍了Apache Paimon中附加表的概念和应用。我们首先定义了什么是附加表,并比较了它与主键表的区别。接着,我们探讨了附加表在不同场景下的使用,包括批量写入和读取、对象存储的友好性、时间穿越和回滚功能、低成本的删除和更新操作、流式接收中小文件的自动合并、队列形式的流式读写以及高性能查询。此外,我们还详细介绍了流式处理的相关技术,包括自动小文件合并、流式查询的不同模式、顺序保证的重要性以及分桶附加表的优势。最后,我们讨论了数据更新策略,包括DELETE和UPDATE操作,以及如何通过配置优化查询性能。

如果你想参与讨论,请 点击这里👉https://github.com/hiszm/BigDataWeekly,每周都有新的主题,周末或周一发布。

大数据精读,探索知识的深度。

关注 大数据精读周刊

版权声明:自由转载-非商用-非衍生-保持署名([创意共享 3.0 许可证](https://creativecommons.org/licenses/by-nc-nd/3.0/deed.e


原文地址:https://blog.csdn.net/jankin6/article/details/144329389

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!