自学内容网 自学内容网

elasticsearch基于Rest Client API实现增删改查

本文主要记录如何在SpringBoot项目中使用Es的Rest client API实现基本的增删改查。如果你也刚好刷到这篇文章,希望对你有所帮助。

1. 项目中引入elasticsearch-rest-high-level-client依赖
 <!-- es 高级客户端依赖 -->
   <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
2. 增加配置项和配置类

application.yml文件中增加端口配置

elasticsearch:
  clusterNodes: 127.0.0.1:9200

接下来通过@Configuration@Bean注解的形式将Rest client Bean注册到Spring上下文中

/**
 * @description: Elasticsearch 客户端bean
 * @date: 2024/12/3
 **/
@Configuration
@Slf4j
public class ElasticsearchClientConfiguration {
    /**
     * 建立连接超时时间
     */
    public static int CONNECT_TIMEOUT_MILLIS = 1000;
    /**
     * 数据传输过程中的超时时间
     */
    public static int SOCKET_TIMEOUT_MILLIS = 30000;
    /**
     * 从连接池获取连接的超时时间
     */
    public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;

    /**
     * 路由节点的最大连接数
     */
    public static int MAX_CONN_PER_ROUTE = 10;
    /**
     * client最大连接数量
     */
    public static int MAX_CONN_TOTAL = 30;

    /**
     * es集群节点
     */
    @Value("${elasticsearch.clusterNodes}")
    private String clusterNodes;

    /**
     * es rest client的bean对象
     */
    private RestHighLevelClient restHighLevelClient;


    /**
     * es client bean
     *
     * @return restHighLevelClient es高级客户端
     */
    @Bean
    public RestHighLevelClient restClient() {
        // 创建restClient的构造器
        RestClientBuilder restClientBuilder = RestClient.builder(loadHttpHosts());
        // 设置连接超时时间等参数
        setConnectTimeOutConfig(restClientBuilder);
        setConnectConfig(restClientBuilder);
        restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        return restHighLevelClient;
    }

    /**
     * 加载es集群节点,逗号分隔
     *
     * @return 集群
     */
    private HttpHost[] loadHttpHosts() {
        String[] clusterNodesArray = clusterNodes.split(StringPoolConstant.COMMA);
        HttpHost[] httpHosts = new HttpHost[clusterNodesArray.length];
        for (int i = 0; i < clusterNodesArray.length; i++) {
            String clusterNode = clusterNodesArray[i];
            String[] hostAndPort = clusterNode.split(StringPoolConstant.COLON);
            httpHosts[i] = new HttpHost(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
        }
        return httpHosts;
    }

    /**
     * 配置连接超时时间等参数
     *
     * @param restClientBuilder 创建restClient的构造器
     */
    private void setConnectTimeOutConfig(RestClientBuilder restClientBuilder) {
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
            return requestConfigBuilder;
        });
    }

    /**
     * 使用异步httpclient时设置并发连接数
     *
     * @param restClientBuilder 创建restClient的构造器
     */
    private void setConnectConfig(RestClientBuilder restClientBuilder) {
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
            httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
            return httpClientBuilder;
        });
    }

    @PreDestroy
    public void close() {
        if (restHighLevelClient != null) {
            try {
                log.info("Closing the ES REST client");
                restHighLevelClient.close();
            } catch (IOException e) {
                log.error("Problem occurred when closing the ES REST client", e);
            }
        }
    }

}

@PreDestroy 是 Java 中的一个注解,用于标记一个方法,该方法将在依赖注入的对象被销毁之前调用。通常用于清理资源、关闭连接等操作。这里用于标记 close 方法,确保在 ElasticsearchClientConfiguration 对象被销毁之前,close 方法会被调用以关闭 RestHighLevelClient 客户端。

3. 通过Rest Client API实现增删改查
/**
 * @description: Rest Client API CRUD测试
 * @date: 2024/12/3
 **/
@SpringBootTest(classes = EsApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class EsCRUDTest {

    @Autowired
    private RestHighLevelClient restClient;

    /**
     * es测试用的index
     */
    private final static String TEST_BOOK_INDEX = "test_book_index";

    /**
     * 检查索引是否存在
     * @param indexName 索引名称
     * @return  true 存在
     * @throws IOException 异常信息
     */
    private Boolean checkIndexExist(String indexName) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        return restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }


    /**
     * 删除索引
     *
     * @param indexName 索引名称
     * @throws IOException 异常信息
     */
    private void deleteIndex(String indexName) throws IOException {
        try {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            AcknowledgedResponse response = restClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
            log.info("delete index:{} response:{}", indexName, response.isAcknowledged());
        } catch (RuntimeException e) {
            log.info("delete index exception", e);
        }
    }

    /**
     * 创建索引
     * @param indexName 索引名称
     * @throws IOException 异常信息
     */
    private void createIndex(String indexName) throws IOException {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        createIndexRequest.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 1));
        createIndexRequest.mapping("{\"properties\":{\"bookName\":{\"type\":\"text\"}}}", XContentType.JSON);
        CreateIndexResponse createIndexResponse = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        log.info("create index {} response:{}", indexName, createIndexResponse.isAcknowledged());
    }

    /**
     * 初始化 删除索引和创建索引
     *
     * @throws IOException 异常信息
     */
    @Before
    public void init() throws IOException {
        if(this.checkIndexExist(TEST_BOOK_INDEX)){
            log.info("index:{}已经存在, 无需重复创建", TEST_BOOK_INDEX);
        } else {
            // 创建索引
            this.createIndex(TEST_BOOK_INDEX);
        }
    }


    /**
     * 测试新增数据
     */
    @Test
    public void testInsert() throws IOException {
        Book data = new Book("1", "三国演义");
        this.insertData(data);
    }

    /**
     * 测试查询操作
     */
    @Test
    public void testQueryById() throws IOException {
        Book data = new Book("1", "三国演义");
        // 查询数据
        this.queryById(data);
    }

    /**
     * 测试修改操作
     */
    @Test
    public void testUpdate() throws IOException {
        // 插入数据
        Book data = new Book("2", "射雕英雄传");
        this.insertData(data);


        // 更新数据
        data.setBookName("书名被更新");
        UpdateRequest updateRequest = new UpdateRequest(TEST_BOOK_INDEX, data.getId());

        updateRequest.doc(JSONObject.toJSONString(data), XContentType.JSON);
        // 强制刷新数据
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        UpdateResponse updateResponse = restClient.update(updateRequest, RequestOptions.DEFAULT);
        Assert.assertEquals(updateResponse.status(), RestStatus.OK);

        // 查询更新结果
        this.queryById(data);

    }

    /**
     * 测试删除操作
     */
    @Test
    public void testDelete() throws IOException {
        Book data = new Book("3", "隋唐演义");
        this.insertData(data);

        // 删除数据
        DeleteRequest deleteRequest = new DeleteRequest(TEST_BOOK_INDEX, data.getId());
        // 强制刷新
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        DeleteResponse deleteResponse = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
        Assert.assertEquals(deleteResponse.status(), RestStatus.OK);

        // 查询数据

        SearchRequest searchRequest = new SearchRequest(TEST_BOOK_INDEX);
        searchRequest.source(new SearchSourceBuilder()
                .query(QueryBuilders.termQuery("id", data.getId()))
        );
        SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
        Assert.assertEquals(searchResponse.status(), RestStatus.OK);

        // 查询条数为1
        SearchHits hits = searchResponse.getHits();
        Assert.assertEquals(0, hits.getTotalHits().value);
    }

    /**
     * 根据id查询数据
     *
     * @param expectedData 查询数据
     * @throws IOException 异常信息
     */
    private void queryById(Book expectedData) throws IOException {
        SearchRequest searchRequest = new SearchRequest(TEST_BOOK_INDEX);
        searchRequest.source(SearchSourceBuilder.searchSource()
                .query(QueryBuilders.termQuery("id", expectedData.getId())));
        SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
        Assert.assertEquals(searchResponse.status(), RestStatus.OK);

        // 查询条数为1
        SearchHits hits = searchResponse.getHits();
        Assert.assertEquals(1, hits.getTotalHits().value);

        // 判断查询数据和插入数据是否相等
        String dataJson = hits.getHits()[0].getSourceAsString();
        Assert.assertEquals(JSON.toJSONString(expectedData), dataJson);
    }



    /**
     * 插入数据
     *
     * @param data 数据
     * @throws IOException 异常信息
     */
    private void insertData(Book data) throws IOException {
        IndexRequest indexRequest = new IndexRequest(TEST_BOOK_INDEX);
        indexRequest.id(data.getId());
        // 强制刷新数据
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        indexRequest.source(JSONObject.toJSONString(data), XContentType.JSON);
        IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);
        Assert.assertEquals(indexResponse.status(), RestStatus.CREATED);
    }

    @Data
    @Builder
    static class Book {
        private String id;
        private String bookName;
    }

}

上面单元测试用例中包含了索引创建、索引删除、新增数据、查询数据、删除数据、检查索引等基本操作。


原文地址:https://blog.csdn.net/heanfeiliu/article/details/144452555

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!