自学内容网 自学内容网

Python实现Java mybatis-plus 产生的SQL自动化测试SQL速度和判断SQL是否走索引

Python实现Java mybatis-plus 产生的SQL自动化测试SQL速度和判断SQL是否走索引

文件目录如下

│  sql_speed_test.py
│  
├─input
│      data-report_in_visit_20240704.log
│      resource_in_sso_20240704.log
│      
└─output
        data-report_in_visit_20240704.csv
        resource_in_sso_20240704.csv

目前每次做实验都要将Java中的SQL做性能测试,否则就没法通过考核,属实难崩。

sql_speed_test.py是我用python写的程序,将从Java mybatis-plus控制台产生的日志复制到data-report_in_visit_20240704.logresource_in_sso_20240704.log文件中,运行程序之后output文件夹会自动输出csv文件,下面为csv文件夹详情。

data-report_in_visit_20240704.log文件

-- 301 -- [2024-07-04 14:22:55.055] [org.apache.ibatis.logging.jdbc.BaseJdbcLogger] [http-nio-9006-exec-2] [143] [DEBUG] [traceId:] ==>
SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type
-- 302 -- [2024-07-04 14:22:55.055] [org.apache.ibatis.logging.jdbc.BaseJdbcLogger] [http-nio-9006-exec-2] [143] [DEBUG] [traceId:] ==>
SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic_202406 where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type

data-report_in_visit_20240704.csv文件

序号SQL功能描述SQL预估业务数据量实际测试数据量执行时间执行结果索引是否生效所属项目所在库
1SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_typet_bw_article_daily_statistict_bw_article_daily_statistic:5741行0.419603 秒462data-reportvisit
2SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic_202406 where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_typet_bw_article_daily_statistic_202406t_bw_article_daily_statistic_202406:296358行0.291532 秒2691data-reportvisit

sql_speed_test.py文件

import os
import csv
import re
from pymysql import *
import time

"""
开发一个用于SQL性能测试的工具,避免一直做重复工作
将Java中的每一条SQL采用mybatis-plus-plugin插件抓出来转存至log文件中
抓住每条SQL做测试
"""


def extract_query_sql_table_names(sql):
    # 正则表达式模式
    pattern = re.compile(
        r"\b(?:FROM|JOIN|INTO|UPDATE|TABLE|INTO)\s+([`\"\[\]]?[a-zA-Z_][\w$]*[`\"\[\]]?)",
        re.IGNORECASE
    )

    # 查找所有匹配的表名
    matches = pattern.findall(sql)

    # 去掉引号和方括号
    tables = [re.sub(r'[`"\[\]]', '', match) for match in matches]
    filter_tables = []
    for table in tables:
        if "t_bw" in table:
            filter_tables.append(table)

    return filter_tables


def check_index_usage(connection, sql):
    if not sql.startswith("select") and not sql.startswith("SELECT"):
        return False
    try:
        with connection.cursor() as cursor:
            # 使用 EXPLAIN 来获取查询计划
            explain_sql = f"EXPLAIN {sql}"
            cursor.execute(explain_sql)
            explain_result = cursor.fetchall()

            # 打印 EXPLAIN 结果
            print("EXPLAIN 结果:")
            for row in explain_result:
                print(row)

            # 检查每行是否使用了索引
            for row in explain_result:
                if row[5] is not None:
                    print(f"SQL 使用了索引: {row[5]}")
                    return True

            print("SQL 未使用索引")
            return False
    finally:
        pass
        # connection.close()


def count_rows(connection, table_name):
    try:
        with connection.cursor() as cursor:
            # 构建 SQL 语句
            sql = f"SELECT COUNT(*) FROM {table_name}"

            # 执行 SQL 语句
            cursor.execute(sql)

            # 获取结果
            result = cursor.fetchone()

            # 返回结果
            return result[0]
    except Exception as e:
        print(e)
    finally:
        # connection.close()
        pass


class SqlSpeedTest:
    def __init__(self):
        self.input_dir = "./input"
        self.output_dir = "./output"
        self.databases = {
            "sso": {
                "ip": "xxxxxx",
                "database": "sso",
                "username": "xxxx",
                "password": "xxxx"
            }
        }

    def get_all_input_files(self):
        files = os.listdir(r'./input')
        # file_paths = []
        # for file in files:
        #     file_paths.append(self.input_dir + "/" + file)

        return files

    def handle_sql_log(self, project, database, lines):
        sql_lines = []
        row_count = 1
        database_info = self.databases.get(database)
        conn = connect(host=database_info.get("ip"),
                       port=3306,
                       user=database_info.get("username"),
                       password=database_info.get("password"),
                       database=database_info.get("database"),
                       charset='utf8mb4')

        for index, line in enumerate(lines):
            if line.startswith("--"):
                continue

            current_sql = line.replace("\n", "")
            execute_info = self.execute_sql_and_get_execute_time(conn, database, current_sql)

            tables = extract_query_sql_table_names(current_sql)
            real_rows = ""
            for table in tables:
                total_rows = count_rows(conn, table)
                real_rows += f"{table}:{total_rows}行 "

            sql_line = {
                "row_count": row_count,
                "sql_description": "",
                "sql": current_sql,
                "expect_rows": ",".join(tables),
                "real_rows": real_rows,
                "execute_time": execute_info["execute_time"],
                "execute_rows": execute_info["execute_rows"],
                "index_has_work": execute_info["index_has_work"],
                "project": project,
                "project": project,
                "database": database
            }

            sql_lines.append(sql_line)
            row_count += 1

        conn.close()
        return sql_lines

    def execute_sql_and_get_execute_time(self, conn, database, sql):
        print(f"==================> {database}库正在执行SQL: {sql}")
        # 记录开始时间
        try:
            cs = conn.cursor()  # 获取光标

            start_time = time.time()
            cs.execute(sql)
            rows = cs.fetchall()
            # 记录结束时间
            end_time = time.time()
            # 计算执行时间
            execution_time = end_time - start_time
            conn.commit()
            print(f"======>{database}库共花费{execution_time:.6f}秒执行完毕,{sql}")
        except Exception as e:
            print(e)
            return {"execute_rows": "", "execute_time": "", "index_has_work": ""}

        index_has_work = check_index_usage(conn, sql)
        return {"execute_rows": len(rows), "execute_time": f"{execution_time:.6f} 秒",
                "index_has_work": "是" if index_has_work else "否"}

    def handle_log_file(self, filename):

        with open(self.input_dir + "/" + filename, "r", encoding="utf-8") as file:
            lines = file.readlines()

        pre_filename = filename.split(".")[0]
        with open(self.output_dir + "/" + pre_filename + ".csv", "w", newline='', encoding='utf-8-sig') as f:
            writer = csv.writer(f,  quoting=csv.QUOTE_MINIMAL)
            csv_title = ["序号", "SQL功能描述", "SQL", "预估业务数据量", "实际测试数据量", "执行时间", "执行结果",
                         "索引是否生效", "所属项目", "所在库"]
            writer.writerow(csv_title)
            info = pre_filename.split("_in_")
            project_name = info[0]
            database_name = info[1].split("_")[0]
            sql_lines = self.handle_sql_log(project_name, database_name, lines)
            for sql_line in sql_lines:
                write_line = [sql_line["row_count"],
                              sql_line["sql_description"],
                              sql_line["sql"],
                              sql_line["expect_rows"],
                              sql_line["real_rows"],
                              sql_line["execute_time"],
                              sql_line["execute_rows"],
                              sql_line["index_has_work"],
                              sql_line["project"],
                              sql_line["database"]]
                writer.writerow(write_line)

    def do_work(self):
        files = self.get_all_input_files()
        for file in files:
            self.handle_log_file(file)


if __name__ == '__main__':
    sql_speed_test = SqlSpeedTest()
    sql_speed_test.do_work()

写在最后

编程精选网(www.codehuber.com),程序员的终身学习网站已上线!

如果这篇【文章】有帮助到你,希望可以给【JavaGPT】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【JavaGPT】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!


原文地址:https://blog.csdn.net/weixin_46294086/article/details/140647601

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!