自学内容网 自学内容网

ClickHouse 入门

简介

ClickHouse 是一个列式数据库,传统的数据库一般是按行存储,而ClickHouse则是按列存储,每一列都有自己的存储空间,并且只存储该列的数值,而不是存储整行的数据。这样做主要有几个好处,压缩率高,查询速度款、支持高并发,ClickHouse在处理大规模数据时具有很高的性能。

优势:

  • 数据压缩率高: 由于每列存放相同类型的数据,这些数据在存储时可以采用更高效的压缩算法,从而节省存储空间。
  • 查询速度快: 列存储适合于针对某些特定列的查询,因为它只需要加载和处理相关的列数据,比行存储更高效。特别对于大量数据进行聚合运算(如SUM、AVG)的查询,列存储通常更快。
  • 支持高并发: 列存储在读取数据时可以仅加载需要的列,提供了更好的并发性能,更适合处理大规模数据查询。

Java 操作ClickHouse

以下使用ClickHouse提供的Java Client V2 客户端操作。

没有使用Java提供的JDBC Driver,主要是考虑可能存在部分特性不支持,后期修改成本较大,同时目前 JDBC Driver 是基于Client V1实现,后期会升级为Client V2,升级的过程可能也会带来一些变动。

配置ClickHouse

基础配置:

@ConfigurationProperties(prefix = "clickhouse")
@Data
@Configuration
public class ClickHouseProperties {

    private String url;

    private String username;

    private String password;

    private String database;
}

配置ClickHouse 客户端:

@Configuration
public class ClickHouseConfig {


    @Bean
    public Client chDirectClient(ClickHouseProperties chProperties) {
        return new Client.Builder()
                .addEndpoint(chProperties.getUrl())
                .setUsername(chProperties.getUsername())
                .setPassword(chProperties.getPassword())

                // sets the maximum number of connections to the server at a time
                // this is important for services handling many concurrent requests to ClickHouse
                .setMaxConnections(100)
                .setLZ4UncompressedBufferSize(1058576)
                .setSocketRcvbuf(500_000)
                .setSocketTcpNodelay(true)
                .setSocketSndbuf(500_000)
                .setClientNetworkBufferSize(500_000)
                .allowBinaryReaderToReuseBuffers(true) // using buffer pool for binary reader
                // 开启JSON类型的支持,但是目前对于json的实现并不完善,不建议使用
                .serverSetting("allow_experimental_json_type", "1")
                // allow JSON transcoding as a string
//                .serverSetting(ServerSettings.INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
//                .serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1")
                .setDefaultDatabase(chProperties.getDatabase())
                .setConnectTimeout(10000)
                .setSocketTimeout(10000)
                .build();
    }
}

增删改查操作

package edu.whu.metadata.dao.clickhouse;

import cn.hutool.core.util.StrUtil;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.metrics.ServerMetrics;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseFormat;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.whu.metadata.entity.TrackData;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static edu.whu.metadata.entity.TrackData.TABLE_NAME;

@Slf4j
@Repository
public class TrackDataDao {

    @Resource
    private Client chClient;

    InsertSettings insertSettings = new InsertSettings();

/**
       流式插入,可以读取csv文件流,写入clickHouse中
 */
    public void insertStream(InputStream dataStream) {
        try {
            try (InsertResponse response = chClient.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
                    insertSettings).get(1, TimeUnit.HOURS)) {

                log.info("Insert finished: {} rows written", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
            } catch (Exception e) {
                log.error("Failed to write JSONEachRow data", e);
                throw new RuntimeException(e);
            }
        } finally {
            try {
                dataStream.close();
            } catch (Exception e) {
                log.error("Failed to close data stream", e);
            }
        }

    }

    /**
     * 插入单条 TrackData 数据
     */
    public void insert(TrackData trackData) {
        try {
            List<TrackData> trackDataList = List.of(trackData);
            chClient.insert(TABLE_NAME, trackDataList, insertSettings);
            log.info("Inserted TrackData with id {}", trackData.getId());
        } catch (Exception e) {
            log.error("Failed to insert TrackData", e);
            throw new RuntimeException(e);
        }
    }

    public void insertBatch(List<TrackData> trackDataList) {
        try {
            InsertResponse insertResponse = chClient.insert(TABLE_NAME, trackDataList, insertSettings).get();
            log.info("Inserted TrackData with id {}", insertResponse.getMetrics());
        } catch (Exception e) {
            log.error("Failed to insert TrackData", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 根据 id 查询数据
     */
    public List<TrackData> findByObjectId(String id) {
        List<TrackData> result = new ArrayList<>();
        String sql = "SELECT * FROM " + TABLE_NAME + " WHERE object_id = " + id;
        QuerySettings querySettings = new QuerySettings();
        try (QueryResponse response = chClient.query(sql, querySettings).get()) {
            ClickHouseBinaryFormatReader reader = chClient.newBinaryFormatReader(response);

            while (reader.hasNext()) {
                reader.next();

                TrackData trackData = buildTrackData(reader);

                result.add(trackData);
            }
        } catch (Exception e) {
            log.error("Failed to query TrackData by id", e);
            throw new RuntimeException(e);
        }
        return result;
    }


    /**
     * 根据地理范围、时间段和 object_id 查询数据,支持分页查询
     */
    public Page<TrackData> findByPage(Double minLatitude, Double maxLatitude,
                                      Double minLongitude, Double maxLongitude,
                                      LocalDateTime startTime, LocalDateTime endTime,
                                      String objectId, Pageable pageable) {
        List<TrackData> result = new ArrayList<>();

        // 构建基础查询语句
        StringBuilder sqlBuilder = new StringBuilder("SELECT * FROM " + TABLE_NAME + " WHERE 1=1");

        // 根据地理范围添加查询条件
        if (minLatitude != null && maxLatitude != null) {
            sqlBuilder.append(" AND latitude BETWEEN ").append(minLatitude).append(" AND ").append(maxLatitude);
        }
        if (minLongitude != null && maxLongitude != null) {
            sqlBuilder.append(" AND longitude BETWEEN ").append(minLongitude).append(" AND ").append(maxLongitude);
        }

        // 根据时间范围添加查询条件 将 LocalDateTime 转换为指定格式的字符串
        if (startTime != null && endTime != null) {
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
            String formattedStartTime = startTime.format(formatter);
            String formattedEndTime = endTime.format(formatter);

            sqlBuilder.append(" AND timestamp BETWEEN '")
                    .append(formattedStartTime)
                    .append("' AND '")
                    .append(formattedEndTime)
                    .append("'");
        }

        // 根据 object_id 添加查询条件
        if (objectId != null) {
            sqlBuilder.append(" AND object_id = '").append(objectId).append("'");
        }

        // 添加分页条件
        sqlBuilder.append(" LIMIT ").append(pageable.getPageSize()).append(" OFFSET ").append(pageable.getOffset());

        String sql = sqlBuilder.toString();
        log.info("Generated SQL: {}", sql);

        // 执行查询
        try (QueryResponse response = chClient.query(sql).get(10, TimeUnit.SECONDS)) {
            TableSchema tableSchema = chClient.getTableSchema(TABLE_NAME);
            ClickHouseBinaryFormatReader reader = chClient.newBinaryFormatReader(response, tableSchema);

            while (reader.hasNext()) {
                reader.next();
                TrackData trackData = buildTrackData(reader);  // 将查询结果转换为 TrackData 对象
                result.add(trackData);
            }
        } catch (Exception e) {
            log.error("Failed to query TrackData", e);
            throw new RuntimeException(e);
        }

        long count = 0L;
        String countSql = sql.replaceFirst("SELECT \\*", "SELECT COUNT(*)");
        countSql = countSql.substring(0, countSql.indexOf("LIMIT"));
        log.info("Generated Count SQL: {}", countSql);
        try (QueryResponse countResponse = chClient.query(countSql).get(10, TimeUnit.SECONDS)) {
            TableSchema countSchema = chClient.getTableSchema(TABLE_NAME);
            ClickHouseBinaryFormatReader countReader = chClient.newBinaryFormatReader(countResponse, countSchema);
            if (countReader.hasNext()) {
                countReader.next();
                count = countReader.getLong(1);  // 获取 COUNT(*) 的结果
            }
        } catch (Exception e) {
            log.error("Failed to query total count", e);
            throw new RuntimeException(e);
        }

        return new PageImpl<>(result, pageable, count);
    }

    /**
     * 根据 id 删除数据
     */
    public void deleteById(Long id) {
        String sql = "ALTER TABLE " + TABLE_NAME + " DELETE WHERE id = " + id;
        try {
            chClient.execute(sql);
            log.info("Deleted TrackData with id {}", id);
        } catch (Exception e) {
            log.error("Failed to delete TrackData", e);
            throw new RuntimeException(e);
        }
    }

    @NotNull
    private static TrackData buildTrackData(ClickHouseBinaryFormatReader reader) {
        TrackData trackData = new TrackData();
        trackData.setId(reader.getString("id"));
        trackData.setDataType(reader.getString("data_type"));
        trackData.setObjectId(reader.getString("object_id"));
        trackData.setTimestamp(reader.getLocalDateTime("timestamp"));
        trackData.setLatitude(reader.getDouble("latitude"));
        trackData.setLongitude(reader.getDouble("longitude"));
        trackData.setMetadata(reader.getString("metadata"));
        Float speed = StrUtil.isBlankIfStr(reader.getString("speed")) ? null : reader.getFloat("speed");
        trackData.setSpeed(speed);
        Float heading = StrUtil.isBlankIfStr(reader.getString("heading")) ? null : reader.getFloat("heading");
        trackData.setHeading(heading);
        Float altitude = StrUtil.isBlankIfStr(reader.getString("altitude")) ? null : reader.getFloat("altitude");
        trackData.setAltitude(altitude);
        return trackData;
    }
}

参考


原文地址:https://blog.csdn.net/qq_38431321/article/details/145249999

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!