自学内容网 自学内容网

Clickhouse实战处理(一)集成引擎和Distributed引擎之集成Hive

一、集成引擎

集成引擎集成第三方的存储和系统来读写数据,ClickHouse本身不存储数据。
集成引擎包含:

  1. Kafka
  2. MySQL
  3. ODBC
  4. JDBC
  5. HDF

Kafka引擎

1、Kafka引擎介绍

SELECT查询对于读取消息并不是很有用(除了调试),因为每个消息只能读取一次。
通常,将该引擎结合物化视图一起使用,使用方法:
(1)、使用Kafka引擎创建一个Kafka的消费者,并将其视为一个数据流。
(2)、创建所需结构的表。
(3)、创建一个物化视图,该视图转换来自引擎的数据并将其放入上一步创建的表中。
当物化视图添加至该引擎,它将会在后台收集数据。这就允许你从Kafka持续接收消息并使用
SELECT将数据转换为所需的格式。它们不直接从Kafka中读取数据,而是接收新记录,以block为
单位, 这样就可以写入具有不同详细信息级别的多个表(分组聚合或无聚合)中。
为了提高性能,接收到的消息将被分组为大小为max_insert_block_size的block(块)。如果block没
有在stream_flush_interval_ms时间内形成,则不管block的完整性如何,数据都将刷新到表中。
Kafka引擎
要停止接收topic数据或更改转换逻辑,需detach物化视图。
DETACH TABLE consumer;
ATTACH MATERIALIZED VIEW consumer;
如果要使用ALTER更改目标表,建议禁用物化视图,以避免目标表和该视图中的数据之间出现差异。

2、kafka的参数和示例

Kafka引擎结合Kafka使用,可实现订阅或发布数据流。
指定表引擎:
ENGINE = Kafka()
SETTINGS
kafka_broker_list = ‘host:port’,
kafka_topic_list = ‘topic1,topic2,…’,
kafka_group_name = ‘group_name’,
kafka_format = ‘data_format’,
kafka_row_delimiter = ‘delimiter_symbol’,
kafka_schema = ‘’,
kafka_num_consumers = N,
kafka_skip_broken_messages = N

必选参数:
kafka_broker_list :以逗号分隔的brokers列表。
kafka_topic_list :以逗号分隔的kafka的topic列表。
kafka_group_name: Kafka消费组。
kafka_format :消息的格式,例如JSONEachRow。
可选参数:
kafka_row_delimiter :行之间的分隔符。
kafka_schema :按需定义schema,例如Cap'n Proto格式需指定。
kafka_num_consumers :消费者数量,默认1,最多不超过
Topic的分区数。
kafka_skip_broken_messages :每个block中,Kafka的消息解析器容忍schema不兼容消息的数量。默认值:0

创建Kafka引擎表示例
示例1:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’,
‘topic’, ‘group1’, ‘JSONEachRow’);

示例2:
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS
kafka_broker_list =
‘localhost:9092’,
kafka_topic_list =
‘topic’,
kafka_group_name =
‘group1’,
kafka_format =
‘JSONEachRow’,

示例3:
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic’, ‘group1’)
SETTINGS kafka_format =
‘JSONEachRow’,
kafka_num_consumers = 4;

3、Kafka的扩展配置
Kafka引擎支持使用ClickHouse配置文件扩展配置。用户可以使用两个配置key, 全局的kafka和topic级别的kafka_*。首先应用全局配置,然后应用topic级别的配置。

cgrp smallest 250 100000 有关可能的配置选项的列表,参见librdkafka配置,链接: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 。 ClickHouse配置中使用下划线(_)代替点,例如,check.crcs=true 将配置为 true

MySQL引擎

1、MySQL引擎介绍

MySQL引擎可实现对MySQL数据库的表执行插入和查询操作。ClickHouse表结构可以不同于原始的MySQL表结构。
列名应当与原始MySQL表中的列名相同,但可以按任意顺序使用其中的一些列。列的数据类型可能与原始的MySQL表中的列类型不同,ClickHouse尝试进行数据类型转换。

2、MySQL表引擎参数:

ENGINE = MySQL(‘host:port’, ‘database’, ‘table’, ‘user’, ‘password’[, replace_query,
‘on_duplicate_clause’]);
引擎参数:

  1. host:port :MySQL server的地址。
  2. database :MySQL数据库名称。
  3. table : MySQL表名。
  4. user : MySQL用户名。
  5. password : MySQL用户密码。
  6. replace_query :将INSERT INTO查询转换为REPLACE INTO查询的标识。如果replace_query=1,查询将被替换。
  7. on_duplicate_clause : 将on_duplicate_clause的表达式添加到INSERT查询中。

例如:插入1个数据: INSERT INTO t (c1,c2) VALUES (‘a’, 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1,
on_duplicate_clause表达式为: UPDATE c2 = c2 + 1。
如果使用on_duplicate_clause,需设置参数replace_query=0。如果同时传递replace_query=1和on_duplicate_clause,ClickHouse将抛出异常。

MySQL引擎的样例

1、在MySQL创建表和插入数据
DROP TABLE test.test ;
create table test.test (
  id INT  NOT NULL AUTO_INCREMENT,
  cnt INT,
  PRIMARY KEY (id)
);

insert into test.test (id, cnt) VALUES (1,2);
select * from test.test;
-----如果mysql插入1条数据。
INSERT INTO test (id,cnt) VALUES (2, 3) ON DUPLICATE KEY UPDATE cnt = cnt + 1;   
关于DUPLICATE KEY UPDATE c2 = c2 + 1,它是MySQL中INSERT INTO语句的一种变体。当使用INSERT INTO语句插入数据时,如果插入的数据与已有数据的主键或唯一索引重复,则会触发错误。但是使用DUPLICATE KEY UPDATE c2 = c2 + 1可以在重复时更新该行数据。

2. 在ClickHouse中创建MySQL引擎的表

DROP TABLE mysql_table_dup;
CREATE TABLE mysql_table_dup
(
    id Int32,
    cnt Int32
)
ENGINE = MySQL('127.0.0.1:3306', 'test', 'test', 'root', '123456', 0, 'UPDATE cnt=cnt+1'); 

JDBC引擎

1、JDBC引擎介绍

ClickHouse通过JDBC引擎连接到外部数据库, 如MySQL、Oracle、PostgreSQL等。
ClickHouse使用一个单独的项目clickhouse-jdbc-bridge, 其作为一个守护进程运行。记得要启动服务: java -jar ./clickhouse-jdbc-bridge-1.0.1.jar --driver-path /root/jdbc/lib/
其中/root/jdbc/lib为数据库驱动包所在的目录
clickhouse-jdbc-bridge项目链接:https://github.com/ClickHouse/clickhouse-jdbc-bridge

2、表引擎参数:

ENGINE = JDBC(dbms_uri, external_database, external_table)

  1. dbms_uri :
    外部DBMS的URI:jdbc:<driver_name>://<host_name>:/?user=&password=。
    例如:jdbc:mysql://localhost:3306/?user=root&password=root 。
  2. external_database : 外部DBMS的数据库。
  3. external_table : 外部数据库的表名

Hive引擎

Hive引擎允许对HDFS Hive表执行 SELECT 查询。

创建ck表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Hive('thrift://host:port', 'database', 'table');
PARTITION BY expr 
;

thrift://host:port — Hive Metastore 地址
database — 远程数据库名.
table — 远程数据表名. 

  1. 列名应该与原来的Hive表相同,但你可以使用这些列中的一些,并以任何顺序,你也可以使用一些从其他列计算的别名列。
  2. 列类型与原Hive表的列类型保持一致。
  3. “Partition by expression”应与原Hive表保持一致,“Partition by expression”中的列应在表结构中。

二、Distributed引擎

Distributed引擎,也就是分布式引擎,本身不存储数据,但可以在多个服务器上进行分布式查询。读是自动并行的。读取时,远程服务器表的索引(如果存在)会被使用。

create table dis_table(id UInt16, name String)
Distributed(cluster_name, database, table, [sharding_key])

参数解析:
cluster_name:服务器配置文件中的集群名,在/etc/metrika.xml中配置的。具体配置见前文。
database:数据库名。
table:表名。
sharding_key:数据分片键。 
  1. 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片
  2. 分布式表:分布式表只能使用Distribute表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表

有了分布式表之后,我们就可以向分布式表中插入数据,那么分布式表会根据配置的sharding_key将数据写入到不同的节点分片中。

分片键sharding_key:
分片键要求返回一个整形类型的取值,包括Int系列和UInt系列,有如下几种情况:

Distributed(cluster,database,table,userid)   --可以是一个具体的整形列字段
Distributed(cluster,database,table,rand())   --可以按照随机数划分
Distributed(cluster,database,table,intHash64(userid))   --可以按照某个整形列进行散列值划分

注意:如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张本地表,否则,在写入数据时将会报错。如果分布式表只包含一个分片,也就失去了分布式的意义,所以虽然分片键是选填参数,但是通常都会按照业务规则进行设置。

2、Hive2CK(分布式查询)

1、创建local表和分布式表 :
create table adm.local表 on CLUSTER CK集群名称 (
       字段1           String,
       字段2           String,
       字段3           String,
       dt              int
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{分片变量参数}/adm/hive表名_local','{副本变量参数}')
order by (dept_id)
PARTITION BY (dt)
SETTINGS storage_policy = 'data_data1'
;

create table adm.ck表名(一般和hive表名一样) on CLUSTER cluster_5shards_2replicas (
       字段1                 String,
       字段2               String,
       字段3             String,
       dt                      int
)
ENGINE = Distributed(CK集群名称,adm,local表,rand())
;

2、数据同步工具和conf配置文件

批量数据使用数据同步工具(datax、waterdrop)进行同步。
Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
1)使用 Waterdrop前请先准备好Spark和Java运行环境。java版本为jdk1.8。
2)一个完整的Waterdrop配置包含spark, input, filter, output,

spark是spark相关的配置,可配置的spark参数见: Spark Configuration, 其中master, deploy-mode两个参数不能在这里配置,需要在Waterdrop启动脚本中指定。
input可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
output可配置任意的output插件及其参数,具体参数随不同的output插件而变化。filter处理完的数据,会发送给output中配置的每个插件。

conf文件如下:

spark {
  spark.sql.catalogImplementation = "hive"
  spark.app.name = "hive2ck-"${schema}"."${table_name}"."${dt}
  spark.executor.instances = 2
  spark.executor.cores = 2
  spark.executor.memory = "10g"
  spark.yarn.maxAppAttempts = 1
  spark.dynamicAllocation.enabled = "true"
  spark.dynamicAllocation.maxExecutors = 10
}

input {
    hive {
        pre_sql = """select 字段1,字段2,字段3,,,,,,
                     from  adm.hive表名
 where dt='${dt}'
 ----'"""${dt}"'"
 
 
    table_name = "hive表名"
    }
}


filter {
convert {     --这里是要做转化的字段说明,CK不支持string。
    source_field = "dt",
    new_type = "integer"
}
}

output {
    clickhouse {
        host = ${ck_host}":"${ck_http_port}
        database = ${schema}
        table = ${table_name}"_local"
        cluster = "CK集群名称"
        username = ${ck_user}
        password = ${ck_passwd}
        clickhouse.socket_timeout = 100000
        bulk_size = 500000
    }
}

3、运行脚本

# client 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/application.conf

# cluster 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/application.conf

# cluster 模式下按固定周期从Hive表同步到CK分布表,如果conf文件没有实现准备,也可通过命令执行原表和目标表。不推荐这种。
./waterdrop/bin/start-waterdrop.sh 
--master yarn --deploy-mode cluster \
--config  ${conf_file} \
-i schema=${schema} -i table_name=${table_name} -i month=${month}  -i dt=${dt}  -i hour=${hour} \
-i ck_host="${ck_host}" -i ck_http_port="${ck_http_port}" -i ck_user="${ck_user}" -i ck_passwd="${ck_passwd}"


原文地址:https://blog.csdn.net/sheep8521/article/details/139088108

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!