自学内容网 自学内容网

mysql到doris的DDL整库转换工具

一、简介

适用于常规的mysql-ddl到doris-ddl的整库转换,其他需求可以修改代码实现。
需要链接到mysql,实际上我做过对比,解析DDL VS 直接查Mysql,解析DDL可能会有解析失败的情况,查询Mysql元数据的方式比较稳定可靠。

整体思路:
1.获取mysql元数据,
2.转换成doris的格式
3.套doris ddl模板拼起来

二、获取Mysql元数据

python获取Mysql元数据依赖pymysql包,因此要提前准备好包,这里不赘述

2.1 mysql连接器

# mysql连接
def mysql_connect(config):
    content = ''
    # MySQL连接
    return pymysql.connect(
        host=config['mysql']['host'],
        user=config['mysql']['user'],
        port=config['mysql']['port'],
        password=config['mysql']['password'],
        database=config['mysql']['database'],
    )

2.2获取所有表

有了连接器,就可以执行具体的sql
先用 show tables 快速的获取库里所有的表

# 获取库里所有的表
def mysql_table_list(mysql_conn):
    with mysql_conn.cursor() as cursor:
        cursor.execute("SHOW TABLES")
        return [row[0] for row in cursor.fetchall()]

2.3表元数据

接着遍历所有的表,依次获取每个表的元数据。
这里常用的sql有:

desc table_name;
SELECT *  FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'database_name' AND TABLE_NAME = 'table_name';
show full columns from table_name  ;

这里三个sql语句我做了简单的对比
第一个:内容太少了,会丢失字段注释
第二个:内容很全
第三个:东西比较多,满足使用

这里使用第三种,本质是后面两个都可以

# 获取表字段的所有信息
def describe_table(mysql_conn,table_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"show full columns from {table_name}")
        return cursor.fetchall()

2.4 表注释

到此一张表的所有字段信息都有了,不过还缺一个 表注释
从 information_schema.TABLES 里获取表的注释

# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"SELECT table_name , table_comment  FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
        # return cursor.fetchone()
        return [row[1] for row in cursor.fetchall()]

到此,一张表的所有信息都有了,就要开始转成适用于doris类型

三、转换适用于doris

3.1 类型转换

mysql -> doris 有一些类型需要做映射,下面只是列举了一切常用的

# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
    mysql_type = mysql_type.lower()
    if 'bigint' in mysql_type:
        return 'BIGINT'
    elif 'int' in mysql_type:
        return 'INT'
    elif  mysql_type.startswith('varchar'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'VARCHAR({int(length) * 3})'  # 考虑UTF-8编码
    elif 'datetime' in mysql_type:
        return 'DATETIME'
    elif mysql_type.startswith('char'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'CHAR({int(length) * 3})'
    elif 'date' in mysql_type:
        return 'DATE'
    elif 'timestamp' in mysql_type:
        return 'datetime'
    elif 'text' in mysql_type:
        return 'STRING'  # TEXT类型最大长度
    elif 'tinyint' in mysql_type:
        return 'TINYINT'
    else:
        return mysql_type.upper()  # 其他类型直接转换为大写

这里有个坑,顺序会造成类型匹配时穿透,bigint 需要放在 int类型前面,否则会匹配到int里。

3.2 key字段提取

我这里默认都使用doris的 UNIQUE模型,在转换时常用mysql的primary key来替代。 就需要做一下转换。

def uqune_key(fileds):
    unqune_key = []
    for filed in fileds:
        if filed[4] == 'PRI':
            unqune_key.append('`'+ filed[0]+'`')
    # 如无主键 默认第一个字段作为主键
    if len(unqune_key) == 0:
        unqune_key.append('`'+fileds[0][0]+'`')
    return unqune_key

这里的fileds 就来自上面的 describe_table 方法 。

3.3 拼DDL

到这里就可以把整体的建表语句拼接出来了

doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
    uqune_key = ','.join(uqune_key_list)
    ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+  'UNIQUE KEY('+uqune_key + ') \n'
    if table_comment is not None:
        ddl += 'COMMENT \'' + table_comment + '\' \n'
    ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' +  ' \n'
    ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
    return ddl

我这里模版使用的是单副本 。

四、整体代码

以下是整体代码

import json
import pymysql
import re


# 读取JSON文件
def read_and_format_json(file_path):
    # 读取文件内容
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    # 将内容转换为Python字典
    return json.loads(content)

def mysql_connect(config):
    content = ''
    # MySQL连接
    return pymysql.connect(
        host=config['mysql']['host'],
        user=config['mysql']['user'],
        port=config['mysql']['port'],
        password=config['mysql']['password'],
        database=config['mysql']['database'],
    )
def mysql_table_list(mysql_conn):
    with mysql_conn.cursor() as cursor:
        cursor.execute("SHOW TABLES")
        return [row[0] for row in cursor.fetchall()]


def describe_table(mysql_conn,table_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"show full columns from {table_name}")
        return cursor.fetchall()



# 表注释
def mysql_table_comment(mysql_conn,table_name,database_name):
    with mysql_conn.cursor() as cursor:
        cursor.execute(f"SELECT table_name , table_comment  FROM information_schema.TABLES WHERE table_schema='{database_name}' and table_name = '{table_name}'")
        return [row[1] for row in cursor.fetchall()]




# 将MySQL类型转换为doris类型
def convert_type(mysql_type):
    mysql_type = mysql_type.lower()
    if 'bigint' in mysql_type:
        return 'BIGINT'
    elif 'int' in mysql_type:
        return 'INT'
    elif  mysql_type.startswith('varchar'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'VARCHAR({int(length) * 3})'  # 考虑UTF-8编码
    elif 'datetime' in mysql_type:
        return 'DATETIME'
    elif mysql_type.startswith('char'):
        length = mysql_type.split('(')[1].split(')')[0]
        return f'CHAR({int(length) * 3})'
    elif 'date' in mysql_type:
        return 'DATE'
    elif 'timestamp' in mysql_type:
        return 'datetime'
    elif 'text' in mysql_type:
        return 'STRING'  # TEXT类型最大长度
    elif 'tinyint' in mysql_type:
        return 'TINYINT'
    else:
        return mysql_type.upper()  # 其他类型直接转换为大写

def column_default_value(default_value):
    if default_value == 'NULL':
        return 'DEFALUT NULL'
    elif default_value == 'CURRENT_TIMESTAMP':
        return 'CURRENT_TIMESTAMP'
    else:
        return default_value



def covert_ddl_column(fileds):

    doris_columns_list = []
    for field in fileds:
        # 0    1     2          3      4       5            6     7           8
        # name type  collation is_null is_key default_vaule extra privileges comment
        # 字段名 类型  非字符集  是否允许空 是否主键  默认值         额外信息     权限 注释
        column_type = convert_type(field[1])
        column = '`'+ field[0] + '`  ' + column_type
        if field[3] == 'YES':
            column += ' NULL '
        else:column+=' NOT NULL '
        if field[5] is not None and column_type != 'DATETIME' and column_type != 'DATE':
            if field[5] == 'NULL':
                column += ' DEFAULT NULL '
            else: column += ' DEFAULT \'' + field[5] + '\''
        if field[8] is not None:
            column += ' COMMENT \'' + field[8] + '\''

        # column += ' ,'
        doris_columns_list.append(column)
    doris_columns_str = ', \n '.join(doris_columns_list)
    return doris_columns_str
def uqune_key(fileds):
    unqune_key = []
    for filed in fileds:
        if filed[4] == 'PRI':
            unqune_key.append('`'+ filed[0]+'`')
    # 如无主键 默认第一个字段作为主键
    if len(unqune_key) == 0:
        unqune_key.append('`'+fileds[0][0]+'`')
    return unqune_key


def concat_ddl(conf,table_name,doris_columns_str,uqune_key_list,table_comment):
    doris_table_name = conf['doris']['table_prefix'] + table_name + conf['doris']['table_suffix']
    uqune_key = ','.join(uqune_key_list)
    ddl = 'CREATE TABLE ' + doris_table_name + ' ( \n' + doris_columns_str + ') \n'+  'UNIQUE KEY('+uqune_key + ') \n'
    if table_comment is not None:
        ddl += 'COMMENT \'' + table_comment + '\' \n'
    ddl += 'DISTRIBUTED BY HASH(' + uqune_key_list[0] + ') BUCKETS 10' +  ' \n'
    ddl += 'PROPERTIES (\n' + '"replication_num" = "1",\n' + '"in_memory" = "false",\n' + '"storage_format" = "DEFAULT"\n' + '); \n\n'
    return ddl




if __name__ == '__main__':
    json_data = read_and_format_json('conf.json')
    mysql_conn = mysql_connect(json_data)
    tables = mysql_table_list(mysql_conn)
    with open('doris_ddl.sql', 'w', encoding='utf-8') as f:
        for table_name in tables:
            print(table_name)
            fileds = describe_table(mysql_conn, table_name)
            doris_columns_str = covert_ddl_column(fileds)
            uqune_key_list = uqune_key(fileds)
            table_comment = mysql_table_comment(mysql_conn,table_name,json_data['mysql']['database'])
            doris_ddl = concat_ddl(json_data,table_name,doris_columns_str,uqune_key_list,table_comment[0])
            f.write(doris_ddl)
            


最后补一个conf.json的格式

{
    "mysql": {
        "host": "",
        "port": ,
        "user": "",
        "password": "",
        "database": ""
    },
    "doris": {
        "database": "ods_project_develop",
        "table_prefix": "ods_project_develop_",
        "table_suffix": "_i_daily"
    }

原文地址:https://blog.csdn.net/weixin_45399602/article/details/143423780

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!