自学内容网 自学内容网

Golang操作ES全系列(olivere & curl操作es)

Golang操作ES全系列(olivere & curl操作es)

🚀全部代码(欢迎👏🏻star):

  • https://github.com/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-es

1 olivere

创建client

package main

import (
"crypto/tls"
"fmt"
"github.com/olivere/elastic/v7"
"net"
"net/http"
"time"
)

var (
//host = "http://localhost:9200"
host = "http://es.xx.ziyi.com"
)

func main() {
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetBasicAuth("", ""),
elastic.SetHttpClient(&http.Client{Transport: &DecoratedTransport{
tp: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout:   30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2:     true,
MaxIdleConns:          100,
IdleConnTimeout:       90 * time.Second,
TLSHandshakeTimeout:   10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}}),
)
if err != nil {
panic(err)
}
fmt.Println(esClient)
}

type DecoratedTransport struct {
tp http.RoundTripper
}

func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {
request.Host = "es.xx.ziyi.com"
return d.tp.RoundTrip(request)
}

在这里插入图片描述

检测索引是否存在

func isExistIndex(esClient *elastic.Client) {
isExist, err := esClient.IndexExists("test-ziyi-1-100004-100136").Do(context.TODO())
if err != nil {
panic(err)
}
if isExist {
println("index exists")
} else {
println("index not exists")
}
}

创建索引

func createIndex(esClient *elastic.Client) {
type m map[string]interface{}
indexMapping := m{
"settings": m{
"number_of_shards":   5, //分片数
"number_of_replicas": 1, //副本数
},
"mappings": m{
"properties": m{ //索引属性值
"book_name": m{ //索引属性名
"type": "text", //filed类型
//"analyzer": "ik_max_word", //使用ik分词器进行分词
"index": true,  //当前field可以被用于查询条件
"store": false, //是否额外存储
},
"author": m{
"type": "keyword", //作为关键字不分词
},
"word_count": m{
"type": "long",
},
"on_sale_time": m{
"type":   "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
},
"book_desc": m{
"type": "text",
//"analyzer": "ik_max_word",
},
},
},
}
result, err := esClient.CreateIndex("test-ziyi-1-100004-100136").BodyJson(indexMapping).Do(context.Background())
if err != nil {
panic(err)
}
if result.Acknowledged {
println("create index success")
} else {
println("create index failed")
}
}

删除索引

# 除了程序我们也可以先通过curl查看索引是否存在
curl http://localhost:9200/_cat/indices?v | grep ziyi

在这里插入图片描述

func deleteIndex(esClient *elastic.Client) {
response, err := esClient.DeleteIndex("test-ziyi-1-100004-100136").Do(context.Background())
if err != nil {
panic(err)
}
println(response.Acknowledged)
}

添加记录doc

# _doc/1 # 表明查询id为1的doc文档
# test-ziyi-1-100004-100136指定索引名
curl -X GET "http://localhost:9200/test-ziyi-1-100004-100136/_doc/1?pretty"

在这里插入图片描述

func addDoc(esClient *elastic.Client) {
type m map[string]interface{}
documentMappings := m{
"book_name":    "士兵突击",
"author":       "兰晓龙",
"word_count":   100000,
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
//如果不指定id,则es会自动生成一个id(杂乱无序不好维护),response为返回的文档id
//response, err := esClient.Index().Index("test-ziyi-1-100004-100136").BodyJson(documentMappings).Do(context.Background())
response, err := esClient.Index().Index("test-ziyi-1-100004-100136").Id("1").BodyJson(documentMappings).Do(context.Background()) //指定id
if err != nil {
panic(err)
}
println(response.Id)
}

更新doc记录

在这里插入图片描述

func updateDoc(esClient *elastic.Client) {
type m map[string]interface{}
docMappings := m{
"book_name":    "士兵突击",
"author":       "袁朗",
"word_count":   100000,
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
//覆盖式修改(response返回doc记录的id)
response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(docMappings).Do(context.Background())
//指定字段修改
//response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(map[string]interface{}{
//"book_name": "我的团长我的团",
//}).Do(context.Background())
if err != nil {
panic(err)
}
println(response.Id)
}

删除doc记录

func deleteDoc(esClient *elastic.Client) {
//response返回删除的doc Id,如果要删除的doc不存在,则直接返回err not found
response, err := esClient.Delete().Index("test-ziyi-1-100004-100136").Id("1").Do(context.Background())
if err != nil {
panic(err)
}
println(response.Id)
}

批处理

func sendBulkRequest(esClient *elastic.Client) {
bulkRequest := esClient.Bulk()
for i := 0; i < 10; i++ {
docMappings := map[string]interface{}{
"book_name":    fmt.Sprintf("士兵突击-%d", i),
"author":       "袁朗",
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
bulkRequest = bulkRequest.Add(elastic.NewBulkIndexRequest().Index("test-ziyi-1-100004-100136").Doc(docMappings))
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
panic(err)
}
if bulkResponse.Errors {
for _, item := range bulkResponse.Items {
for _, action := range item {
if action.Error != nil {
fmt.Printf("Error for item: %s: %s", action.Error.Index, action.Error.Reason)
}
}
}
} else {
fmt.Println("All bulk requests executed successfully")
}
}

普通查询

func simpleSearch(esClient *elastic.Client) {
response, err := esClient.Search([]string{"test-ziyi-1-100004-100136"}...).Query(elastic.NewTermQuery("author", "袁朗")).Size(100).Do(context.TODO())
if err != nil {
panic(err)
}
fmt.Println(response.Hits.Hits)
}

在这里插入图片描述

searchAfter翻页查询

func searchAfterSearch(esClient *elastic.Client) {
var lastHit *elastic.SearchHit
for {
q := elastic.NewBoolQuery().
Must(elastic.NewTermQuery("book_name", "士"))
searchSource := elastic.NewSearchSource().Query(q).Size(2).Sort("_id", false)
if lastHit != nil {
fmt.Printf("search After %+v\n", lastHit.Sort)
searchSource.SearchAfter(lastHit.Sort...)
}
dsl, err := searchSource.MarshalJSON()
if err != nil {
panic(err)
}
fmt.Printf("dsl %s\n", string(dsl))
searchResult, err := esClient.Search().Index("test-ziyi-1-100004-100136").SearchSource(searchSource).Do(context.Background())
if err != nil {
panic(err)
}
if len(searchResult.Hits.Hits) == 0 {
fmt.Println("no more data")
break
}
for _, hit := range searchResult.Hits.Hits {
res := make(map[string]interface{})
if err = json.Unmarshal(hit.Source, &res); err != nil {
panic(err)
}
fmt.Printf("search %s %s\n", hit.Id, res["author"])
}
lastHit = searchResult.Hits.Hits[len(searchResult.Hits.Hits)-1]
}
}

全部代码

package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"net"
"net/http"
"time"
)

var (
host = "http://test.ziyi.com"
)

func main() {
esClient := CreateEsClient()
fmt.Println(esClient)
//1. 操作索引
//isExistIndex(esClient)
//createIndex(esClient)
//deleteIndex(esClient)

//2. 操作doc文档(记录)
//addDoc(esClient)
//updateDoc(esClient)
//deleteDoc(esClient)

//3. 批处理请求
//sendBulkRequest(esClient)

//4. 查询
//simpleSearch(esClient)
//searchAfterSearch(esClient)

}

func searchAfterSearch(esClient *elastic.Client) {
var lastHit *elastic.SearchHit
for {
q := elastic.NewBoolQuery().
Must(elastic.NewTermQuery("book_name", "士"))
searchSource := elastic.NewSearchSource().Query(q).Size(2).Sort("_id", false)
if lastHit != nil {
fmt.Printf("search After %+v\n", lastHit.Sort)
searchSource.SearchAfter(lastHit.Sort...)
}
dsl, err := searchSource.MarshalJSON()
if err != nil {
panic(err)
}
fmt.Printf("dsl %s\n", string(dsl))
searchResult, err := esClient.Search().Index("test-ziyi-1-100004-100136").SearchSource(searchSource).Do(context.Background())
if err != nil {
panic(err)
}
if len(searchResult.Hits.Hits) == 0 {
fmt.Println("no more data")
break
}
for _, hit := range searchResult.Hits.Hits {
res := make(map[string]interface{})
if err = json.Unmarshal(hit.Source, &res); err != nil {
panic(err)
}
fmt.Printf("search %s %s\n", hit.Id, res["author"])
}
lastHit = searchResult.Hits.Hits[len(searchResult.Hits.Hits)-1]
}
}

func simpleSearch(esClient *elastic.Client) {
response, err := esClient.Search([]string{"test-ziyi-1-100004-100136"}...).Query(elastic.NewTermQuery("author", "袁朗")).Size(100).Do(context.TODO())
if err != nil {
panic(err)
}
fmt.Println(response.Hits.Hits)
}

func sendBulkRequest(esClient *elastic.Client) {
bulkRequest := esClient.Bulk()
for i := 0; i < 10; i++ {
docMappings := map[string]interface{}{
"book_name":    fmt.Sprintf("士兵突击-%d", i),
"author":       "aa",
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
bulkRequest = bulkRequest.Add(elastic.NewBulkIndexRequest().Index("test-ziyi-1-100004-100136").Doc(docMappings))
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
panic(err)
}
if bulkResponse.Errors {
for _, item := range bulkResponse.Items {
for _, action := range item {
if action.Error != nil {
fmt.Printf("Error for item: %s: %s", action.Error.Index, action.Error.Reason)
}
}
}
} else {
fmt.Println("All bulk requests executed successfully")
}
}

func deleteDoc(esClient *elastic.Client) {
//response返回删除的doc Id,如果要删除的doc不存在,则直接返回err not found
response, err := esClient.Delete().Index("test-ziyi-1-100004-100136").Id("1").Do(context.Background())
if err != nil {
panic(err)
}
println(response.Id)
}

func updateDoc(esClient *elastic.Client) {
type m map[string]interface{}
docMappings := m{
"book_name":    "士兵突击",
"author":       "袁朗",
"word_count":   100000,
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
//覆盖式修改(response返回doc记录的id)
response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(docMappings).Do(context.Background())
//指定字段修改
//response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(map[string]interface{}{
//"book_name": "我的团长我的团",
//}).Do(context.Background())
if err != nil {
panic(err)
}
println(response.Id)
}

func addDoc(esClient *elastic.Client) {
type m map[string]interface{}
documentMappings := m{
"book_name":    "士兵突击",
"author":       "兰晓龙",
"word_count":   100000,
"on_sale_time": "2000-01-05",
"book_desc":    "一个关于部队的...",
}
//如果不指定id,则es会自动生成一个id(杂乱无序不好维护),response为返回的文档id
//response, err := esClient.Index().Index("test-ziyi-1-100004-100136").BodyJson(documentMappings).Do(context.Background())
response, err := esClient.Index().Index("test-ziyi-1-100004-100136").Id("1").BodyJson(documentMappings).Do(context.Background()) //指定id
if err != nil {
panic(err)
}
println(response.Id)
}

func deleteIndex(esClient *elastic.Client) {
response, err := esClient.DeleteIndex("test-ziyi-1-100004-100136").Do(context.Background())
if err != nil {
panic(err)
}
println(response.Acknowledged)
}

func createIndex(esClient *elastic.Client) {
type m map[string]interface{}
indexMapping := m{
"settings": m{
"number_of_shards":   5, //分片数
"number_of_replicas": 1, //副本数
},
"mappings": m{
"properties": m{ //索引属性值
"book_name": m{ //索引属性名
"type": "text", //filed类型
//"analyzer": "ik_max_word", //使用ik分词器进行分词
"index": true,  //当前field可以被用于查询条件
"store": false, //是否额外存储
},
"author": m{
"type": "keyword", //作为关键字不分词
},
"word_count": m{
"type": "long",
},
"on_sale_time": m{
"type":   "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
},
"book_desc": m{
"type": "text",
//"analyzer": "ik_max_word",
},
},
},
}
result, err := esClient.CreateIndex("test-ziyi-1-100004-100136").BodyJson(indexMapping).Do(context.Background())
if err != nil {
panic(err)
}
if result.Acknowledged {
println("create index success")
} else {
println("create index failed")
}
}

func isExistIndex(esClient *elastic.Client) {
isExist, err := esClient.IndexExists("test-ziyi-1-100004-100136").Do(context.TODO())
if err != nil {
panic(err)
}
if isExist {
println("index exists")
} else {
println("index not exists")
}
}

func CreateEsClient() *elastic.Client {
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetBasicAuth("", ""),
elastic.SetHttpClient(&http.Client{Transport: &DecoratedTransport{
tp: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout:   30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2:     true,
MaxIdleConns:          100,
IdleConnTimeout:       90 * time.Second,
TLSHandshakeTimeout:   10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}}),
)
if err != nil {
panic(err)
}
return esClient
}

type DecoratedTransport struct {
tp http.RoundTripper
}

func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {
request.Host = "test.ziyi.com"
return d.tp.RoundTrip(request)
}

2 go-elasticsearch

searchAfter翻页查询

package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/estransport"
"net"
"net/http"
"os"
"strings"
"time"
)

var (
url      = []string{"http://test.ziyi.com"}
username = ""
password = ""
sort     = json.RawMessage(`[{"_id":{"order":"desc"}}]`)
aggs     = json.RawMessage(`{"size": {"sum": {"field": "size"}},"count":{"value_count": {"field": "_id"}}}`)
size     = 2
indices  = []string{"test-ziyi-1-100004-100136"}
)

func main() {
esClient, err := CreateClient(url, username, password)
if err != nil {
panic(err)
}
var searchAfter []interface{}

for {
dsl := Dsl{
Sort:        sort,
Size:        size,
SearchAfter: searchAfter,
Query: map[string]interface{}{
"bool": map[string]interface{}{
"must": map[string]interface{}{
"wildcard": map[string]interface{}{
"book_name": "士",
},
//"match_all": map[string]interface{}{},
},
},
},
}
queryJson, err := json.MarshalIndent(dsl, "", "\t")
if err != nil {
panic(err)
}
fmt.Printf("queryJson:%s\n", queryJson)
res, err := esClient.Search(
esClient.Search.WithContext(context.Background()),
esClient.Search.WithIndex(indices...),
esClient.Search.WithBody(strings.NewReader(string(queryJson))),
esClient.Search.WithTrackTotalHits(false),
)
if err != nil {
panic(err)
}
var result struct {
Hits struct {
Hits []struct {
Index  string                 `json:"_index"`
ID     string                 `json:"_id"`
Sort   []interface{}          `json:"sort"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}

if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
panic(err)
}
err = res.Body.Close()
if err != nil {
panic(err)
}
if len(result.Hits.Hits) > 0 {
lastHit := result.Hits.Hits[len(result.Hits.Hits)-1]
searchAfter = lastHit.Sort
} else {
break
}
for _, h := range result.Hits.Hits {
fmt.Printf("=====id:%s book_name:%s\n", h.ID, h.Source["book_name"])
}
}
}

type Dsl struct {
Sort        json.RawMessage        `json:"sort"`
Size        int                    `json:"size"`
SearchAfter []interface{}          `json:"search_after,omitempty"`
Query       map[string]interface{} `json:"query"`
}

func CreateClient(url []string, username, password string) (*elasticsearch.Client, error) {
es, err := elasticsearch.NewClient(genConfig(url, username, password))
if err != nil {
panic(err)
return nil, err
}
res, err := es.Info()
if err != nil {
panic(err)
return nil, err
}
defer res.Body.Close()
return es, nil

}

type DecoratedTransport struct {
tp http.RoundTripper
}

func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {
request.Host = "test.ziyi.com"
return d.tp.RoundTrip(request)
}

func genConfig(url []string, username, password string) elasticsearch.Config {
retryBackoff := backoff.NewExponentialBackOff()
cfg := elasticsearch.Config{
Addresses:     url,
Logger:        &estransport.ColorLogger{Output: os.Stdout},
Username:      username,
Password:      password,
RetryOnStatus: []int{502, 503, 504, 429},
RetryBackoff: func(i int) time.Duration {
if i == 1 {
retryBackoff.Reset()
}
return retryBackoff.NextBackOff()
},
MaxRetries: 5,
Transport: &DecoratedTransport{
tp: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout:   30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2:     true,
MaxIdleConns:          100,
IdleConnTimeout:       90 * time.Second,
TLSHandshakeTimeout:   10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
},
}
return cfg
}

3 拓展

es基础概念

索引index(databse)

类型type(table),6.x后弃用

文档doc(row)

属性field(column)

curl操作es

curl localhost:9200/_cluster/health # 查看集群健康状态
curl localhost:9200/_cat/pending_tasks # 查看任务堆积详情
curl localhost:9200/_cluster/state/metadata # 查看集群元数据状态信息 
curl localhost:9200/_cluster/stats # 查看集群指标统计信息
curl localhost:9200/_cluster/allocation/explain # 查看集群分片分配详情
curl localhost:9200/_cluster/allocation/explain #查看集群分片分配详情
curl http://localhost:9200/test-*/_count # 统计文档总数(记录数)
curl localhost:9200/_cluster/settings # 查看集群settings信息
curl localhost:9200/_tasks
curl http://localhost:9200/test-ziyi-1-100000-218/_mapping # 查看索引信息
curl -X GET "http://es.test.ziyi.com/test-ziyi-1-100004-100136/_doc/1?pretty" # 查询id为1的文档记录
curl http://localhost:9200/_cat/indices?v # 查看各个索引记录数,?v带上表头,展示详细信息
curl  -X DELETE "http://192.168.100.88:9200/my_index" # 删除索引(包含索引结构)
# 删除索引数据不包含索引结构
curl -X POST \
     -H 'Content-Type: application/json' \
     -d '{"query":{"match_all":{}}}' \
     'http://localhost:9200/test-ziyi-metadata-1-100004-100136/_delete_by_query?pretty=true' 

# 进入es pod,执行查询命令
curl -X POST "http://your_elasticsearch_host:9200/test-ziyi-metadata-1-10000-3/_search" -H 'Content-Type: application/json' -d '
{
  "size": 100,
  "sort": [
    { "mtime": { "order": "desc" } },
    { "key": { "order": "desc" } }
  ],
  "query": {
    "range": {
      "size": {
        "gt": 10485760
      }
    }
  }
}'

例如:查看索引信息
在这里插入图片描述


原文地址:https://blog.csdn.net/weixin_45565886/article/details/140387576

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!