自学内容网 自学内容网

Flink把kafa数据写入Doris的N种方法及对比。

用Flink+Doris来开发实时数仓,首要解决是如何接入kafka实时流,下面是参考Doris官方文档和代码,在自己项目开发的实践中总结,包括一些容易踩坑的细节。

Routine Load方法

如果Doris是2.1以上,不需要复杂的数据转换的,建议使用Doris自带的Routine Load,实测使用方便,性能高。

 接入kafka实时数据

Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。

Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。下面示例如何通过拉入kafka数据(json格式):

首先是创建需要导入的表:

CREATE TABLE testdb.test_routineload_tbl(
    user_id            BIGINT       NOT NULL COMMENT "user id",
    name               VARCHAR(20)           COMMENT "name",
    age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

如果要接入的数据是主键不重复的,可以设置为Unique模型,这样可以删除或修改。

创建 Routine Load 导入作业

在 Doris 中,使用 CREATE ROUTINE LOAD 命令,创建导入作业

CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
    "format"="json",
    "max_error_number" = "999999999999",
     "strip_outer_array"="true",
    "jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-routine-load-json",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

踩坑的问题细节 

max_error_number采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题,通过 SHOW ROUTINE LOAD 命令中 ErrorLogUrls 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。
strip_outer_array当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的形式展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组形式表示,即在最外层中包含中括号[],此时,可以指定 "strip_outer_array" = "true",以数组模式消费 Topic 中的数据。如以下数据会被解析成两行:[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]
我公司的kafka,是需要设置"strip_outer_array"="true",根据实际来调整。

有些kafka的数据有脏数据,可以用max_error_number来过滤。或者考虑用脚本来检测:
import pymysql  #导入 pymysql
import requests,json


#打开数据库连接
db= pymysql.connect(host="host",user="user",
                    password="passwd",db="database",port=port)

# 使用cursor()方法获取操作游标
cur = db.cursor()

#1.查询操作
# 编写sql 查询语句 
sql = "show routine load"
cur.execute(sql)        #执行sql语句
results = cur.fetchall()        #获取查询的所有记录
for row in results :
    name = row[1]
    state = row[7]
    if state != 'RUNNING':
        err_log_urls = row[16]
        reason_state_changed = row[15]
        msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
        payload_message = {
    "msg_type": "text",
    "content": {
        "text": msg
    }
}
        url = 'lark 报警url'
        s = json.dumps(payload_message)
        r = requests.post(url, data=s)
        cur.execute("resume routine load for " + name)

cur.close()
db.close()
通过 SHOW ROUTINE LOAD 来查看结果
mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load
我是没按官方文档加\G,加了\G报错。

如果导入数据有问题,可以通过上面的命令,查看下面这2个:
ErrorLogUrls被过滤的质量不合格的数据的查看地址
OtherMsg其他错误信息
具体可以参考:Routine Load - Apache Doris

Flink Doris Connector方法

Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除)Doris 中存储的数据。

这种方法适合Doris2.1以下的,或者需要对kafka数据进行复杂过滤或数据转换。Routine Load方法也是可以简单过滤的,但觉得还是不能够高度定制:

CREATE ROUTINE LOAD demo.kafka_job04 ON routine_test04
        COLUMNS TERMINATED BY ",",
        WHERE id >= 3
        FROM KAFKA
        (
            "kafka_broker_list" = "10.16.10.6:9092",
            "kafka_topic" = "routineLoad04",
            "property.kafka_default_offsets" = "OFFSET_BEGINNING"
        );  

由于公司项目flink是很老版本1.11,如果是新版本flink,请参考官方文档进行调整:Flink Doris Connector - Apache Doris

完整示例

pom依赖如下:

 <properties>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <java.version>1.8</java.version>
        <flink.version>1.12.2</flink.version>
        <fastjson.version>1.2.83</fastjson.version>
        <hadoop.version>2.8.3</hadoop.version>
        <scope.mode>compile</scope.mode>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.6.5-10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>
        <!--After adding the following two dependencies, Flink's log will appear-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
       
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
        </dependency>
    </dependencies>

 FlinkKafka2Doris :

public class FlinkKafka2Doris {
    //kafka address
    private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
    //kafka groupName
    private static final String groupName = "test_flink_doris_group";
    //kafka topicName
    private static final String topicName = "test_flink_doris";
    //doris ip port
    private static final String hostPort = "xxx:8030";
    //doris dbName
    private static final String dbName = "test1";
    //doris tbName
    private static final String tbName = "doris_test_source_2";
    //doris userName
    private static final String userName = "root";
    //doris password
    private static final String password = "";
    //doris columns
    private static final String columns = "name,age,price,sale";
    //json format
    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServer);
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", "10000");

        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        blinkStreamEnv.enableCheckpointing(10000);
        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
                new SimpleStringSchema(),
                props);

        DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);

//在这里进行数据过滤或转换

        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);

        dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));

        blinkStreamEnv.execute("flink kafka to doris");

    }
}

DorisStreamLoad:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.Serializable;
import java.io.IOException;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;


/**
 * doris streamLoad
 */

public class DorisStreamLoad implements Serializable {

    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);

    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    private String hostPort;
    private String db;
    private String tbl;
    private String user;
    private String passwd;
    private String loadUrlStr;
    private String authEncoding;


    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
        this.hostPort = hostPort;
        this.db = db;
        this.tbl = tbl;
        this.user = user;
        this.passwd = passwd;
        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
    }

    private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        conn.addRequestProperty("max_filter_ratio", "0");
        conn.addRequestProperty("strict_mode", "true");
        conn.addRequestProperty("columns", columns);
        conn.addRequestProperty("format", "json");
        conn.addRequestProperty("jsonpaths", jsonformat);
        conn.addRequestProperty("strip_outer_array", "true");
        conn.setDoOutput(true);
        conn.setDoInput(true);

        return conn;
    }

    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int status, String respMsg, String respContent) {
            this.status = status;
            this.respMsg = respMsg;
            this.respContent = respContent;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(status);
            sb.append(", resp msg: ").append(respMsg);
            sb.append(", resp content: ").append(respContent);
            return sb.toString();
        }
    }

    public LoadResponse loadBatch(String data, String columns, String jsonformat) {
        Calendar calendar = Calendar.getInstance();
        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                UUID.randomUUID().toString().replaceAll("-", ""));

        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;
        try {
            // build request and send to fe
            feConn = getConnection(loadUrlStr, label, columns, jsonformat);
            int status = feConn.getResponseCode();
            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
            if (status != 307) {
                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }
            String location = feConn.getHeaderField("Location");
            if (location == null) {
                throw new Exception("redirect location is null");
            }
            // build request and send to new be location
            beConn = getConnection(location, label, columns, jsonformat);
            // send data to be
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(data.getBytes());
            bos.close();

            // get respond
            status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream) beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            return new LoadResponse(status, respMsg, response.toString());

        } catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.warn(err, e);
            return new LoadResponse(-1, e.getMessage(), err);
        } finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }

}

DorisSink:

import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * Custom doris sink
 */
public class DorisSink extends RichSinkFunction<String> {


    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);

    private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));

    private DorisStreamLoad dorisStreamLoad;

    private String columns;

    private String jsonFormat;

    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
        this.dorisStreamLoad = dorisStreamLoad;
        this.columns = columns;
        this.jsonFormat = jsonFormat;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }


    /**
     * Determine whether StreamLoad is successful
     *
     * @param respContent streamLoad returns the entity
     * @return
     */
    public static Boolean checkStreamLoadStatus(RespContent respContent) {
        if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
            return true;
        } else {
            return false;
        }
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
        if (loadResponse != null && loadResponse.status == 200) {
            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
            if (!checkStreamLoadStatus(respContent)) {
                log.error("Stream Load fail{}:", loadResponse);
            }
        } else {
            log.error("Stream Load Request failed:{}", loadResponse);
        }
    }
}

RespContent:

import java.io.Serializable;

/**
 * Entity returned by streamLoad
 */
public class RespContent implements Serializable {

    private static final long serialVersionUID = 1L;


    /**
     * Imported transaction ID. The user may not perceive it.
     */
    private int TxnId;
    /**
     * Import Label. Specified by the user or automatically generated by the system.
     */
    private String Label;
    /**
     * Import complete status.
     * "Success": Indicates that the import was successful.
     * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
     * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
     */
    private String Status;
    /**
     * The status of the import job corresponding to the existing Label.
     * This field will only be displayed when the Status is "Label Already Exists".
     * The user can know the status of the import job corresponding to the existing Label through this status.
     * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
     */
    private String ExistingJobStatus;
    /**
     * Import error information
     */
    private String Message;
    /**
     * Import the total number of processed rows
     */
    private long NumberTotalRows;
    /**
     * The number of rows successfully imported.
     */
    private long NumberLoadedRows;
    /**
     * Number of rows with unqualified data quality。
     */
    private int NumberFilteredRows;
    /**
     * The number of rows filtered by the where condition
     */
    private int NumberUnselectedRows;
    /**
     * Number of bytes imported。
     */
    private long LoadBytes;
    /**
     * Import completion time. The unit is milliseconds.
     */
    private int LoadTimeMs;
    /**
     * The time it takes to request Fe to start a transaction, in milliseconds
     */
    private int BeginTxnTimeMs;
    /**
     * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
     */
    private int StreamLoadPutTimeMs;
    /**
     * The time spent reading data, in milliseconds
     */
    private int ReadDataTimeMs;
    /**
     * Time to perform a data write operation takes milliseconds。
     */
    private int WriteDataTimeMs;
    /**
     * The time taken to submit and publish the transaction request to Fe, in milliseconds
     */
    private int CommitAndPublishTimeMs;
    /**
     * If there is a data quality problem, check the specific error line by visiting this URL
     */
    private String ErrorURL;

    public int getTxnId() {
        return TxnId;
    }

    public void setTxnId(int txnId) {
        TxnId = txnId;
    }

    public String getLabel() {
        return Label;
    }

    public void setLabel(String label) {
        Label = label;
    }

    public String getStatus() {
        return Status;
    }

    public void setStatus(String status) {
        Status = status;
    }

    public String getExistingJobStatus() {
        return ExistingJobStatus;
    }

    public void setExistingJobStatus(String existingJobStatus) {
        ExistingJobStatus = existingJobStatus;
    }

    public String getMessage() {
        return Message;
    }

    public void setMessage(String message) {
        Message = message;
    }

    public long getNumberTotalRows() {
        return NumberTotalRows;
    }

    public void setNumberTotalRows(long numberTotalRows) {
        NumberTotalRows = numberTotalRows;
    }

    public long getNumberLoadedRows() {
        return NumberLoadedRows;
    }

    public void setNumberLoadedRows(long numberLoadedRows) {
        NumberLoadedRows = numberLoadedRows;
    }

    public int getNumberFilteredRows() {
        return NumberFilteredRows;
    }

    public void setNumberFilteredRows(int numberFilteredRows) {
        NumberFilteredRows = numberFilteredRows;
    }

    public int getNumberUnselectedRows() {
        return NumberUnselectedRows;
    }

    public void setNumberUnselectedRows(int numberUnselectedRows) {
        NumberUnselectedRows = numberUnselectedRows;
    }

    public long getLoadBytes() {
        return LoadBytes;
    }

    public void setLoadBytes(long loadBytes) {
        LoadBytes = loadBytes;
    }

    public int getLoadTimeMs() {
        return LoadTimeMs;
    }

    public void setLoadTimeMs(int loadTimeMs) {
        LoadTimeMs = loadTimeMs;
    }

    public int getBeginTxnTimeMs() {
        return BeginTxnTimeMs;
    }

    public void setBeginTxnTimeMs(int beginTxnTimeMs) {
        BeginTxnTimeMs = beginTxnTimeMs;
    }

    public int getStreamLoadPutTimeMs() {
        return StreamLoadPutTimeMs;
    }

    public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
        StreamLoadPutTimeMs = streamLoadPutTimeMs;
    }

    public int getReadDataTimeMs() {
        return ReadDataTimeMs;
    }

    public void setReadDataTimeMs(int readDataTimeMs) {
        ReadDataTimeMs = readDataTimeMs;
    }

    public int getWriteDataTimeMs() {
        return WriteDataTimeMs;
    }

    public void setWriteDataTimeMs(int writeDataTimeMs) {
        WriteDataTimeMs = writeDataTimeMs;
    }

    public int getCommitAndPublishTimeMs() {
        return CommitAndPublishTimeMs;
    }

    public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
        CommitAndPublishTimeMs = commitAndPublishTimeMs;
    }

    public String getErrorURL() {
        return ErrorURL;
    }

    public void setErrorURL(String errorURL) {
        ErrorURL = errorURL;
    }

    @Override
    public String toString() {
        return "RespContent{" +
                "TxnId=" + TxnId +
                ", Label='" + Label + '\'' +
                ", Status='" + Status + '\'' +
                ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
                ", Message='" + Message + '\'' +
                ", NumberTotalRows=" + NumberTotalRows +
                ", NumberLoadedRows=" + NumberLoadedRows +
                ", NumberFilteredRows=" + NumberFilteredRows +
                ", NumberUnselectedRows=" + NumberUnselectedRows +
                ", LoadBytes=" + LoadBytes +
                ", LoadTimeMs=" + LoadTimeMs +
                ", BeginTxnTimeMs=" + BeginTxnTimeMs +
                ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
                ", ReadDataTimeMs=" + ReadDataTimeMs +
                ", WriteDataTimeMs=" + WriteDataTimeMs +
                ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
                ", ErrorURL='" + ErrorURL + '\'' +
                '}';
    }
}


原文地址:https://blog.csdn.net/linweidong/article/details/145278529

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!