python查询es超长日志,然后针对性的做日志压缩
前言
随着系统规模的不断变大,日志规模也变得越来越大,很多服务每天的日志量都超过了1TB,所以需要对超长的日志进行统计,然后对超长的日志做优化。
python脚本
from elasticsearch import Elasticsearch
import datetime
import re
import json
index_name = "driver-order-service-pro-*" # 替换为你的索引名称
fileName = "driverOrderBigLog.txt"
# 替换为你需要统计的时间段
start_date = datetime.datetime(2024, 12, 9, 17, 00, 00)
end_date = datetime.datetime(2024, 12, 9, 18, 00, 00)
maxPages =1000 #统计日志的总页数,时间+页数确定统计的样本
start = (start_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end = (end_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def paginated_search(es_client, index_name, query, page_size=10, max_pages=None):
"""
分页查询Elasticsearch数据并打印结果
:param es_client: Elasticsearch客户端实例
:param index_name: 要查询的索引名称
:param query: 查询的DSL(Domain Specific Language)字典
:param page_size: 每页返回的记录数,默认是10
:param max_pages: 最大查询页数,默认是None(表示查询所有页)
"""
page_number = 1
search_after = None
scroll_id = None
while True:
if search_after:
search_body = {
"query": query,
"size": page_size,
"search_after": search_after,
"sort": [{"_id": {"order": "asc"}}] # 需要一个稳定的排序字段,这里使用_id
}
else:
search_body = {
"query": query,
"size": page_size,
"from": (page_number - 1) * page_size,
"_source": True # 可选,指定返回哪些字段
}
try:
if scroll_id:
response = es_client.scroll(scroll_id=scroll_id, scroll='2m')
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
else:
response = es_client.search(index=index_name, body=search_body)
hits = response['hits']['hits']
if not hits:
break
for hit in hits:
#print(hit['_source'])
result = hit['_source']['message']
#这里可以添加去重的操作, 比如相同前缀的忽略,或者包含某个关键词的不打印
if(len(result)>5000):
with open(fileName, 'a+', encoding='utf-8') as f2:
f2.writelines(result + "\n\n")
f2.close()
# 更新分页参数
if search_after:
search_after = hits[-1]['sort']
else:
page_number += 1
# 检查是否达到最大页数
if max_pages and page_number > max_pages:
break
except Exception as e:
print(f"Error occurred during search: {e}")
break
finally:
# 如果有scroll_id,记得清理
if scroll_id:
try:
es_client.clear_scroll(scroll_id=scroll_id)
except Exception as e:
print(f"Error occurred during clear scroll: {e}")
scroll_id = None
if scroll_id:
try:
es_client.clear_scroll(scroll_id=scroll_id)
except Exception as e:
print(f"Error occurred during final clear scroll: {e}")
# 示例使用
if __name__ == "__main__":
es = Elasticsearch(hosts="http://X.X.X.X:9200/", http_auth=('userName', 'password')) # 替换为你的Elasticsearch地址
query = {
"bool": {
"filter":
[
{
"range":
{
"@timestamp":
{
"format": "strict_date_optional_time",
"gte": start,
"lte": end
}
}
}
],
"must":
[
],
"must_not":
[
],
"should":
[
]
}
}
paginated_search(es, index_name, query, page_size=10, max_pages=maxPages)
后续处理
通过上面的脚本,将超长的日志打印出来,然后在服务里面对超长的日志进行处理,比如不打印,或者只打印关键信息即可实现日志的压缩。
原文地址:https://blog.csdn.net/bxp1321/article/details/144369386
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!