自学内容网 自学内容网

paimon实战 -- 如何使用partial update实现并发写入paimon宽表代替双流join

背景

         在湖仓业务开发中,进行宽表开发是常见的业务需求, 常规方式都是通过双流或者多流进行join实现将多个表的数据合成一条数据写入宽表,但是join的方式会导致flink任务状态很大,任务不好维护。为了解决状态大的问题paimon提供了partial update,可以不使用join,完成多个insert into 写入或更新同一张表的同一条数据,本文将详细介绍partial update的使用方式和注意事项。

Partial Update

         建表时指定'merge-engine' = 'partial-update',那么就会使用部分更新表引擎,可以做到多个 Flink 流任务去更新同一张表,每条流任务只更新一张表的部分列,最终实现一行完整的数据的更新,对于需要拉宽表的业务场景,partial-update 非常适合此场景,而且构建宽表的操作也相对简单。这里所说的多个 Flink 流任务并不是指多个 Flink Job 并发写同一张 Paimon 表,这样需要拆分 Compaction 任务,就不能在每个 Job 的 Writer 端做 Compaction, 需要一个独立的 Compaction 任务,比较麻烦。目前推荐将多条 Flink 流任务 UNION ALL 起来,启动一个 Job 写 Paimon 表。

注意

1.partial-update 必须跟 lookup 或者 full-compaction changelog producer结合使用。Partial无法接收DELETE消息,可以将partial-update.ignore-delete配置为忽略delete消息。

2.对于流读场景,partial-update 表引擎需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,同时 partial-update 不能接收和处理 DELETE 消息,为了避免接收到 DELETE 消息报错,需要通过配置 'partial-update.ignore-delete' = 'true' 忽略 DELETE 消息。

Sequence Group

Sequence字段并不能解决多流更新的部分更新表的乱序问题,因为多流更新时 Sequence(序列) 字段可能会被另一个流的最新数据覆盖。
因此我们引入了部分更新表的序列组(Sequence Group)机制。 它可以解决:

  1. 多流更新时出现混乱。 每个流定义其自己的序列组。
  2. 真正的部分更新,而不仅仅是非空更新。

如下所示:

CREATE TABLE T (
  k INT,
  a INT,
  b INT,
  g_1 INT,
  c INT,
  d INT,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine'='partial-update',
  'fields.g_1.sequence-group'='a,b',
  'fields.g_2.sequence-group'='c,d'
);

INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);

-- g_2 is null, c, d should not be updated
INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));

SELECT * FROM T; -- output 1, 2, 2, 2, 1, 1, 1

-- g_1 is smaller, a, b should not be updated
INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);

SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3

对于 sequence-group,有效的比较数据类型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。

案例

创建Paimon宽表表

CREATE TABLE if not exists MyTable(
id INT ,
col1_1 STRING,
col1_2 STRING,
col1_3 STRING,
col1_4 STRING,
version1 INT ,
col2_1 STRING,
col2_2 STRING,
col2_3 STRING,
col2_4 STRING,
version2 INT,
col3_1 STRING,
col3_2 STRING,
col3_3 STRING,
col3_4 STRING,
version3 INT ,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'merge-engine'='partial-update',
'fields.version1.sequence-group'='col1_1,col1_2,col1_3,col1_4',
'fields.version2.sequence-group'='col2_1,col2_2,col2_3,col2_4',
'fields.version3.sequence-group'='col3_1,col3_2,col3_3,col3_4',
'bucket' = '2'
);


DML语句准备
//三个程序分别执行下面三个语句
 

insert into MyTable
(id,
col1_1,
col1_2,
col1_3,
col1_4,
version1,
col2_1,
col2_2,
col2_3,
col2_4,
version2,
col3_1,
col3_2,
col3_3,
col3_4,
version3)
select
id,
col1_1,
col1_2,
col1_3,
col1_4,
version1,
CAST(NULL AS String)as col2_1,
CAST(NULL AS String)as col2_2,
CAST(NULL AS String)as col2_3,
CAST(NULL AS String)as col2_4,
CAST(NULL AS INT)as version2,
CAST(NULL AS String)as col3_1,
CAST(NULL AS String)as col3_2,
CAST(NULL AS String)as col3_3,
CAST(NULL AS String)as col3_4,
CAST(NULL AS INT)as version3
from
InputTable;

//
insert into MyTable (id  ,
col1_1 ,
col1_2 ,
col1_3 ,
col1_4 ,
version1   ,
col2_1 ,
col2_2 ,
col2_3 ,
col2_4 ,
version2  ,
col3_1 ,
col3_2 ,
col3_3 ,
col3_4 ,
version3)  select 
 id  ,
CAST(NULL AS String) as col1_1 ,
CAST(NULL AS String) as col1_2 ,
CAST(NULL AS String) as col1_3 ,
CAST(NULL AS String) as col1_4 ,
CAST(NULL AS INT) version1   ,
col2_1 ,
col2_2 ,
col2_3 ,
col2_4 ,
 version2  ,
CAST(NULL AS String) as col3_1 ,
CAST(NULL AS String) as col3_2 ,
CAST(NULL AS String) as col3_3 ,
CAST(NULL AS String) as col3_4 ,
CAST(NULL AS INT) as version3  
from 
InputTable;
//
insert into MyTable (id  ,
col1_1 ,
col1_2 ,
col1_3 ,
col1_4 ,
version1   ,
col2_1 ,
col2_2 ,
col2_3 ,
col2_4 ,
version2  ,
col3_1 ,
col3_2 ,
col3_3 ,
col3_4 ,
version3)  select 
id  ,
CAST(NULL AS String) as col1_1 ,
CAST(NULL AS String) as col1_2 ,
CAST(NULL AS String) as col1_3 ,
CAST(NULL AS String) as col1_4 ,
CAST(NULL AS INT) version1   ,
CAST(NULL AS String) as col2_1 ,
CAST(NULL AS String) as col2_2 ,
CAST(NULL AS String) as col2_3 ,
CAST(NULL AS String) as col2_4 ,
CAST(NULL AS INT) as version2  ,
col3_1 ,
col3_2 ,
col3_3 ,
col3_4 ,
version3  
from 
InputTable;


原文地址:https://blog.csdn.net/aA518189/article/details/144435485

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!