自学内容网 自学内容网

python查询es超长日志,然后针对性的做日志压缩

前言

随着系统规模的不断变大,日志规模也变得越来越大,很多服务每天的日志量都超过了1TB,所以需要对超长的日志进行统计,然后对超长的日志做优化。

python脚本

from elasticsearch import Elasticsearch
import datetime
import re
import json

index_name = "driver-order-service-pro-*"  # 替换为你的索引名称
fileName = "driverOrderBigLog.txt"
# 替换为你需要统计的时间段
start_date = datetime.datetime(2024, 12, 9, 17, 00, 00)
end_date = datetime.datetime(2024, 12, 9, 18, 00, 00)
maxPages =1000 #统计日志的总页数,时间+页数确定统计的样本

start = (start_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end = (end_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")


def paginated_search(es_client, index_name, query, page_size=10, max_pages=None):
    """
    分页查询Elasticsearch数据并打印结果

    :param es_client: Elasticsearch客户端实例
    :param index_name: 要查询的索引名称
    :param query: 查询的DSL(Domain Specific Language)字典
    :param page_size: 每页返回的记录数,默认是10
    :param max_pages: 最大查询页数,默认是None(表示查询所有页)
    """
    page_number = 1
    search_after = None
    scroll_id = None

    while True:
        if search_after:
            search_body = {
                "query": query,
                "size": page_size,
                "search_after": search_after,
                "sort": [{"_id": {"order": "asc"}}]  # 需要一个稳定的排序字段,这里使用_id
            }
        else:
            search_body = {
                "query": query,
                "size": page_size,
                "from": (page_number - 1) * page_size,
                "_source": True  # 可选,指定返回哪些字段
            }

        try:
            if scroll_id:
                response = es_client.scroll(scroll_id=scroll_id, scroll='2m')
                scroll_id = response['_scroll_id']
                hits = response['hits']['hits']
            else:
                response = es_client.search(index=index_name, body=search_body)
                hits = response['hits']['hits']

            if not hits:
                break

            for hit in hits:
                #print(hit['_source'])
                result = hit['_source']['message']
                #这里可以添加去重的操作, 比如相同前缀的忽略,或者包含某个关键词的不打印
                if(len(result)>5000):
                    with open(fileName, 'a+', encoding='utf-8') as f2:
                        f2.writelines(result + "\n\n")
                        f2.close()

            # 更新分页参数
            if search_after:
                search_after = hits[-1]['sort']
            else:
                page_number += 1

            # 检查是否达到最大页数
            if max_pages and page_number > max_pages:
                break

        except Exception as e:
            print(f"Error occurred during search: {e}")
            break

        finally:
            # 如果有scroll_id,记得清理
            if scroll_id:
                try:
                    es_client.clear_scroll(scroll_id=scroll_id)
                except Exception as e:
                    print(f"Error occurred during clear scroll: {e}")
                scroll_id = None

    if scroll_id:
        try:
            es_client.clear_scroll(scroll_id=scroll_id)
        except Exception as e:
            print(f"Error occurred during final clear scroll: {e}")


# 示例使用
if __name__ == "__main__":
    es = Elasticsearch(hosts="http://X.X.X.X:9200/", http_auth=('userName', 'password')) # 替换为你的Elasticsearch地址

    query = {
        "bool": {
            "filter":
                [
                    {
                        "range":
                            {
                                "@timestamp":
                                    {
                                        "format": "strict_date_optional_time",
                                        "gte": start,
                                        "lte": end
                                    }
                            }
                    }
                ],
            "must":
                [
                ],
            "must_not":
                [
                ],
            "should":
                [
                ]
        }
    }

    paginated_search(es, index_name, query, page_size=10, max_pages=maxPages)

后续处理

通过上面的脚本,将超长的日志打印出来,然后在服务里面对超长的日志进行处理,比如不打印,或者只打印关键信息即可实现日志的压缩。


原文地址:https://blog.csdn.net/bxp1321/article/details/144369386

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!