Flink把kafa数据写入Doris的N种方法及对比。
用Flink+Doris来开发实时数仓,首要解决是如何接入kafka实时流,下面是参考Doris官方文档和代码,在自己项目开发的实践中总结,包括一些容易踩坑的细节。
Routine Load方法
如果Doris是2.1以上,不需要复杂的数据转换的,建议使用Doris自带的Routine Load,实测使用方便,性能高。
接入kafka实时数据
Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。
Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。下面示例如何通过拉入kafka数据(json格式):
首先是创建需要导入的表:
CREATE TABLE testdb.test_routineload_tbl(
user_id BIGINT NOT NULL COMMENT "user id",
name VARCHAR(20) COMMENT "name",
age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
如果要接入的数据是主键不重复的,可以设置为Unique模型,这样可以删除或修改。
创建 Routine Load 导入作业
在 Doris 中,使用 CREATE ROUTINE LOAD 命令,创建导入作业
CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
"format"="json",
"max_error_number" = "999999999999",
"strip_outer_array"="true",
"jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.88.62:9092",
"kafka_topic" = "test-routine-load-json",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
踩坑的问题细节
max_error_number | 采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。采样窗口为 max_batch_rows * 10 。即如果在采样窗口内,错误行数大于 max_error_number ,则会导致例行作业被暂停,需要人工介入检查数据质量问题,通过 SHOW ROUTINE LOAD 命令中 ErrorLogUrls 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 |
strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组形式表示,即在最外层中包含中括号[] ,此时,可以指定 "strip_outer_array" = "true" ,以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}] |
我公司的kafka,是需要设置"strip_outer_array"="true",根据实际来调整。 有些kafka的数据有脏数据,可以用max_error_number来过滤。或者考虑用脚本来检测:
import pymysql #导入 pymysql
import requests,json
#打开数据库连接
db= pymysql.connect(host="host",user="user",
password="passwd",db="database",port=port)
# 使用cursor()方法获取操作游标
cur = db.cursor()
#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql) #执行sql语句
results = cur.fetchall() #获取查询的所有记录
for row in results :
name = row[1]
state = row[7]
if state != 'RUNNING':
err_log_urls = row[16]
reason_state_changed = row[15]
msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
payload_message = {
"msg_type": "text",
"content": {
"text": msg
}
}
url = 'lark 报警url'
s = json.dumps(payload_message)
r = requests.post(url, data=s)
cur.execute("resume routine load for " + name)
cur.close()
db.close()
通过 SHOW ROUTINE LOAD 来查看结果
mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load
我是没按官方文档加\G,加了\G报错。 如果导入数据有问题,可以通过上面的命令,查看下面这2个:
ErrorLogUrls | 被过滤的质量不合格的数据的查看地址 |
OtherMsg | 其他错误信息 |
具体可以参考:Routine Load - Apache Doris
Flink Doris Connector方法
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除)Doris 中存储的数据。
这种方法适合Doris2.1以下的,或者需要对kafka数据进行复杂过滤或数据转换。Routine Load方法也是可以简单过滤的,但觉得还是不能够高度定制:
CREATE ROUTINE LOAD demo.kafka_job04 ON routine_test04
COLUMNS TERMINATED BY ",",
WHERE id >= 3
FROM KAFKA
(
"kafka_broker_list" = "10.16.10.6:9092",
"kafka_topic" = "routineLoad04",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
由于公司项目flink是很老版本1.11,如果是新版本flink,请参考官方文档进行调整:Flink Doris Connector - Apache Doris
完整示例
pom依赖如下:
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<java.version>1.8</java.version>
<flink.version>1.12.2</flink.version>
<fastjson.version>1.2.83</fastjson.version>
<hadoop.version>2.8.3</hadoop.version>
<scope.mode>compile</scope.mode>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.6.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.0.3</version>
</dependency>
<!--After adding the following two dependencies, Flink's log will appear-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.12</version>
</dependency>
</dependencies>
FlinkKafka2Doris :
public class FlinkKafka2Doris {
//kafka address
private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
//kafka groupName
private static final String groupName = "test_flink_doris_group";
//kafka topicName
private static final String topicName = "test_flink_doris";
//doris ip port
private static final String hostPort = "xxx:8030";
//doris dbName
private static final String dbName = "test1";
//doris tbName
private static final String tbName = "doris_test_source_2";
//doris userName
private static final String userName = "root";
//doris password
private static final String password = "";
//doris columns
private static final String columns = "name,age,price,sale";
//json format
private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", "10000");
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkStreamEnv.enableCheckpointing(10000);
blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
new SimpleStringSchema(),
props);
DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
//在这里进行数据过滤或转换
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
blinkStreamEnv.execute("flink kafka to doris");
}
}
DorisStreamLoad:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.io.IOException;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;
/**
* doris streamLoad
*/
public class DorisStreamLoad implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
private String hostPort;
private String db;
private String tbl;
private String user;
private String passwd;
private String loadUrlStr;
private String authEncoding;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}
private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "0");
conn.addRequestProperty("strict_mode", "true");
conn.addRequestProperty("columns", columns);
conn.addRequestProperty("format", "json");
conn.addRequestProperty("jsonpaths", jsonformat);
conn.addRequestProperty("strip_outer_array", "true");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
}
public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;
public LoadResponse(int status, String respMsg, String respContent) {
this.status = status;
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status: ").append(status);
sb.append(", resp msg: ").append(respMsg);
sb.append(", resp content: ").append(respContent);
return sb.toString();
}
}
public LoadResponse loadBatch(String data, String columns, String jsonformat) {
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
try {
// build request and send to fe
feConn = getConnection(loadUrlStr, label, columns, jsonformat);
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
if (status != 307) {
throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
}
String location = feConn.getHeaderField("Location");
if (location == null) {
throw new Exception("redirect location is null");
}
// build request and send to new be location
beConn = getConnection(location, label, columns, jsonformat);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(data.getBytes());
bos.close();
// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
return new LoadResponse(status, respMsg, response.toString());
} catch (Exception e) {
e.printStackTrace();
String err = "failed to load audit via AuditLoader plugin with label: " + label;
log.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
} finally {
if (feConn != null) {
feConn.disconnect();
}
if (beConn != null) {
beConn.disconnect();
}
}
}
}
DorisSink:
import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Custom doris sink
*/
public class DorisSink extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private DorisStreamLoad dorisStreamLoad;
private String columns;
private String jsonFormat;
public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
this.dorisStreamLoad = dorisStreamLoad;
this.columns = columns;
this.jsonFormat = jsonFormat;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
/**
* Determine whether StreamLoad is successful
*
* @param respContent streamLoad returns the entity
* @return
*/
public static Boolean checkStreamLoadStatus(RespContent respContent) {
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
&& respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
return true;
} else {
return false;
}
}
@Override
public void invoke(String value, Context context) throws Exception {
DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
if (loadResponse != null && loadResponse.status == 200) {
RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
if (!checkStreamLoadStatus(respContent)) {
log.error("Stream Load fail{}:", loadResponse);
}
} else {
log.error("Stream Load Request failed:{}", loadResponse);
}
}
}
RespContent:
import java.io.Serializable;
/**
* Entity returned by streamLoad
*/
public class RespContent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Imported transaction ID. The user may not perceive it.
*/
private int TxnId;
/**
* Import Label. Specified by the user or automatically generated by the system.
*/
private String Label;
/**
* Import complete status.
* "Success": Indicates that the import was successful.
* "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
* "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
*/
private String Status;
/**
* The status of the import job corresponding to the existing Label.
* This field will only be displayed when the Status is "Label Already Exists".
* The user can know the status of the import job corresponding to the existing Label through this status.
* "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
*/
private String ExistingJobStatus;
/**
* Import error information
*/
private String Message;
/**
* Import the total number of processed rows
*/
private long NumberTotalRows;
/**
* The number of rows successfully imported.
*/
private long NumberLoadedRows;
/**
* Number of rows with unqualified data quality。
*/
private int NumberFilteredRows;
/**
* The number of rows filtered by the where condition
*/
private int NumberUnselectedRows;
/**
* Number of bytes imported。
*/
private long LoadBytes;
/**
* Import completion time. The unit is milliseconds.
*/
private int LoadTimeMs;
/**
* The time it takes to request Fe to start a transaction, in milliseconds
*/
private int BeginTxnTimeMs;
/**
* The time it takes to request Fe to obtain the import data execution plan, in milliseconds
*/
private int StreamLoadPutTimeMs;
/**
* The time spent reading data, in milliseconds
*/
private int ReadDataTimeMs;
/**
* Time to perform a data write operation takes milliseconds。
*/
private int WriteDataTimeMs;
/**
* The time taken to submit and publish the transaction request to Fe, in milliseconds
*/
private int CommitAndPublishTimeMs;
/**
* If there is a data quality problem, check the specific error line by visiting this URL
*/
private String ErrorURL;
public int getTxnId() {
return TxnId;
}
public void setTxnId(int txnId) {
TxnId = txnId;
}
public String getLabel() {
return Label;
}
public void setLabel(String label) {
Label = label;
}
public String getStatus() {
return Status;
}
public void setStatus(String status) {
Status = status;
}
public String getExistingJobStatus() {
return ExistingJobStatus;
}
public void setExistingJobStatus(String existingJobStatus) {
ExistingJobStatus = existingJobStatus;
}
public String getMessage() {
return Message;
}
public void setMessage(String message) {
Message = message;
}
public long getNumberTotalRows() {
return NumberTotalRows;
}
public void setNumberTotalRows(long numberTotalRows) {
NumberTotalRows = numberTotalRows;
}
public long getNumberLoadedRows() {
return NumberLoadedRows;
}
public void setNumberLoadedRows(long numberLoadedRows) {
NumberLoadedRows = numberLoadedRows;
}
public int getNumberFilteredRows() {
return NumberFilteredRows;
}
public void setNumberFilteredRows(int numberFilteredRows) {
NumberFilteredRows = numberFilteredRows;
}
public int getNumberUnselectedRows() {
return NumberUnselectedRows;
}
public void setNumberUnselectedRows(int numberUnselectedRows) {
NumberUnselectedRows = numberUnselectedRows;
}
public long getLoadBytes() {
return LoadBytes;
}
public void setLoadBytes(long loadBytes) {
LoadBytes = loadBytes;
}
public int getLoadTimeMs() {
return LoadTimeMs;
}
public void setLoadTimeMs(int loadTimeMs) {
LoadTimeMs = loadTimeMs;
}
public int getBeginTxnTimeMs() {
return BeginTxnTimeMs;
}
public void setBeginTxnTimeMs(int beginTxnTimeMs) {
BeginTxnTimeMs = beginTxnTimeMs;
}
public int getStreamLoadPutTimeMs() {
return StreamLoadPutTimeMs;
}
public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
StreamLoadPutTimeMs = streamLoadPutTimeMs;
}
public int getReadDataTimeMs() {
return ReadDataTimeMs;
}
public void setReadDataTimeMs(int readDataTimeMs) {
ReadDataTimeMs = readDataTimeMs;
}
public int getWriteDataTimeMs() {
return WriteDataTimeMs;
}
public void setWriteDataTimeMs(int writeDataTimeMs) {
WriteDataTimeMs = writeDataTimeMs;
}
public int getCommitAndPublishTimeMs() {
return CommitAndPublishTimeMs;
}
public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
CommitAndPublishTimeMs = commitAndPublishTimeMs;
}
public String getErrorURL() {
return ErrorURL;
}
public void setErrorURL(String errorURL) {
ErrorURL = errorURL;
}
@Override
public String toString() {
return "RespContent{" +
"TxnId=" + TxnId +
", Label='" + Label + '\'' +
", Status='" + Status + '\'' +
", ExistingJobStatus='" + ExistingJobStatus + '\'' +
", Message='" + Message + '\'' +
", NumberTotalRows=" + NumberTotalRows +
", NumberLoadedRows=" + NumberLoadedRows +
", NumberFilteredRows=" + NumberFilteredRows +
", NumberUnselectedRows=" + NumberUnselectedRows +
", LoadBytes=" + LoadBytes +
", LoadTimeMs=" + LoadTimeMs +
", BeginTxnTimeMs=" + BeginTxnTimeMs +
", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
", ReadDataTimeMs=" + ReadDataTimeMs +
", WriteDataTimeMs=" + WriteDataTimeMs +
", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
", ErrorURL='" + ErrorURL + '\'' +
'}';
}
}
原文地址:https://blog.csdn.net/linweidong/article/details/145278529
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!