MySQL常用脚本_select生成insert迁移语句

适用场景

实现

代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys
import argparse

def parse_create_table(create_table_sql):
    """解析CREATE TABLE语句,提取列名和数据类型"""
    # 移除注释和多余空白
    sql = re.sub(r'--.*?\n|/\*.*?\*/', '', create_table_sql, flags=re.DOTALL)
    sql = re.sub(r'\s+', ' ', sql).strip()
    
    # 提取表名
    table_name_match = re.search(r'CREATE\s+TABLE\s+(?:`?([^`]+)`?|([^\s(]+))', sql, re.IGNORECASE)
    table_name = table_name_match.group(1) or table_name_match.group(2) if table_name_match else None
    
    # 提取列定义
    columns_part = re.search(r'\((.+)\)\s*(?:ENGINE|;|$)', sql, re.IGNORECASE | re.DOTALL)
    if not columns_part:
        return table_name, {}
    
    columns_text = columns_part.group(1)
    
    # 分割列定义,处理逗号但忽略括号内的逗号
    column_defs = []
    bracket_level = 0
    current_def = ""
    
    for char in columns_text:
        if char == '(' or char == '[':
            bracket_level += 1
        elif char == ')' or char == ']':
            bracket_level -= 1
        
        if char == ',' and bracket_level == 0:
            column_defs.append(current_def.strip())
            current_def = ""
        else:
            current_def += char
    
    if current_def.strip():
        column_defs.append(current_def.strip())
    
    # 解析每个列定义
    columns = {}
    for col_def in column_defs:
        # 跳过主键、外键等非列定义
        if re.match(r'^(?:PRIMARY|UNIQUE|FOREIGN)\s+KEY|^KEY|^INDEX|^CONSTRAINT', col_def, re.IGNORECASE):
            continue
        
        # 提取列名和数据类型
        col_match = re.match(r'`?([^`\s]+)`?\s+([^\s,]+)', col_def)
        if col_match:
            col_name = col_match.group(1)
            data_type = col_match.group(2).upper()
            columns[col_name] = data_type
    
    return table_name, columns

def generate_select_for_insert(table_name, columns_dict, selected_columns=None, oracle_compatible=False):
    """生成用于导出INSERT语句的SELECT语句"""
    if not table_name:
        return "错误:无法解析表名"
    
    # 如果没有指定列,使用所有列
    if not selected_columns:
        selected_columns = list(columns_dict.keys())
    else:
        # 确保所有指定的列都存在
        for col in selected_columns:
            if col not in columns_dict:
                return f"错误:列 '{col}' 不存在于表定义中"
    
    # 构建SELECT语句
    select_parts = []
    for col in selected_columns:
        data_type = columns_dict.get(col, '').upper()
        
        # 根据数据类型和Oracle兼容性处理列
        if oracle_compatible:
            if 'DATE' in data_type or 'TIME' in data_type:
                if 'TIMESTAMP' in data_type or ('DATE' in data_type and 'TIME' in data_type):
                    select_parts.append(f"CONCAT('TO_DATE(\'', DATE_FORMAT({col}, '%Y-%m-%d %H:%i:%s'), '\', \'YYYY-MM-DD HH24:MI:SS\')')")
                else:
                    select_parts.append(f"CONCAT('TO_DATE(\'', DATE_FORMAT({col}, '%Y-%m-%d'), '\', \'YYYY-MM-DD\')')")
            elif 'BOOL' in data_type or data_type == 'TINYINT(1)':
                select_parts.append(col)  # Oracle没有布尔类型,使用数字
            else:
                # 字符串类型需要添加引号和转义
                if any(t in data_type for t in ['CHAR', 'TEXT', 'VARCHAR', 'ENUM', 'SET']):
                    select_parts.append(f"CONCAT('\'', REPLACE({col}, '\'', '\'\'\''), '\'')")
                else:
                    select_parts.append(col)
        else:
            # MySQL格式
            if any(t in data_type for t in ['CHAR', 'TEXT', 'VARCHAR', 'ENUM', 'SET']):
                select_parts.append(f"CONCAT('\'', REPLACE({col}, '\'', '\'\'\''), '\'')")
            elif 'DATE' in data_type or 'TIME' in data_type:
                select_parts.append(f"CONCAT('\'', DATE_FORMAT({col}, '%Y-%m-%d %H:%i:%s'), '\'')")
            elif data_type == 'TINYINT(1)' or 'BOOL' in data_type:
                select_parts.append(col)
            else:
                select_parts.append(f"IFNULL({col}, 'NULL')")
    
    # 构建完整的SELECT语句
    columns_str = ', '.join(select_parts)
    
    if oracle_compatible:
        # Oracle格式的INSERT语句
        table_name_upper = table_name.upper()
        return f"SELECT CONCAT('INSERT INTO {table_name_upper} VALUES (', {columns_str}, ');') AS insert_statement FROM {table_name};"
    else:
        # MySQL格式的INSERT语句
        return f"SELECT CONCAT('INSERT INTO `{table_name}` VALUES (', {columns_str}, ');') AS insert_statement FROM {table_name};"

def main():
    parser = argparse.ArgumentParser(description='生成从MySQL表导出INSERT语句的SQL')
    parser.add_argument('--table', '-t', required=True, help='MySQL表名')
    parser.add_argument('--create-sql', '-c', required=True, help='MySQL建表SQL语句或包含SQL的文件路径')
    parser.add_argument('--columns', '-l', help='需要导出的列名,用逗号分隔')
    parser.add_argument('--oracle', '-o', action='store_true', help='生成兼容Oracle的INSERT语句')
    
    args = parser.parse_args()
    
    # 读取CREATE TABLE SQL
    create_sql = args.create_sql
    if create_sql.endswith('.sql') or '/' in create_sql or '\\' in create_sql:
        try:
            with open(create_sql, 'r', encoding='utf-8') as f:
                create_sql = f.read()
        except Exception as e:
            print(f"错误:无法读取SQL文件 - {e}", file=sys.stderr)
            return 1
    
    # 解析列名
    selected_columns = None
    if args.columns:
        selected_columns = [col.strip() for col in args.columns.split(',')]
    
    # 解析CREATE TABLE语句
    table_name, columns_dict = parse_create_table(create_sql)
    
    # 如果未指定表名,使用从SQL解析出的表名
    if args.table != table_name and table_name:
        print(f"警告:SQL中的表名 '{table_name}' 与指定的表名 '{args.table}' 不匹配,将使用指定的表名。", file=sys.stderr)
        table_name = args.table
    elif not table_name:
        table_name = args.table
    
    # 生成SELECT语句
    select_sql = generate_select_for_insert(table_name, columns_dict, selected_columns, args.oracle)
    
    print(select_sql)
    return 0

if __name__ == '__main__':
    sys.exit(main())

执行:

# 基本用法
python3.9 generate_insert_sql.py --table 表名 --create-sql /path/to/create_table.sql 

# 从文件读取CREATE TABLE语句,生成兼容Oracle的INSERT语句
python3.9 generate_insert_sql.py --table 表名 --create-sql /path/to/create_table.sql --oracle

把mysql的insert脚本转成oralce兼容格式:

#!/bin/bash

# 检查参数
if [ $# -lt 1 ]; then
    echo "用法: $0 <mysql_sql_file> [output_file]"
    exit 1
fi

INPUT_FILE=$1
OUTPUT_FILE=${2:-"${INPUT_FILE%.sql}_oracle.sql"}

# 检查输入文件是否存在
if [ ! -f "$INPUT_FILE" ]; then
    echo "错误: 输入文件 '$INPUT_FILE' 不存在"
    exit 1
fi

echo "正在将MySQL INSERT语句转换为Oracle格式..."
echo "输入文件: $INPUT_FILE"
echo "输出文件: $OUTPUT_FILE"

# 创建临时文件
TMP_FILE=$(mktemp)

# 处理MySQL INSERT语句转换为Oracle格式
cat "$INPUT_FILE" | sed '  
# 将表名转换为大写并移除反引号
s/INSERT INTO `\([^`]*\)`/INSERT INTO \U\1\E/g

# 处理日期时间格式: 将MySQL格式 YYYY-MM-DD HH:MM:SS 转换为 Oracle格式 TO_DATE()
s/"\([0-9]\{4\}-[0-9]\{2\}-[0-9]\{2\} [0-9]\{2\}:[0-9]\{2\}:[0-9]\{2\}\)"/TO_DATE("\1", "YYYY-MM-DD HH24:MI:SS")/g

# 处理日期格式: 将MySQL格式 YYYY-MM-DD 转换为 Oracle格式 TO_DATE()
s/"\([0-9]\{4\}-[0-9]\{2\}-[0-9]\{2\}\)"/TO_DATE("\1", "YYYY-MM-DD")/g

# 处理布尔值: 0/1 转换为 0/1
# 注意: 这里我们保留数字格式,如果需要转为字符串可以修改为 "0"/"1"
s/,\([0-9]\+\),"\([^"]*\)",\([0-9]\+\),\([0-9]\+\),/,\1,"\2",\3,\4,/g

# 处理NULL值,确保格式一致
s/,NULL)/,NULL)/g

# 添加分号结束每条语句
s/;$/;/g
' > "$TMP_FILE"

# 添加Oracle特定的设置到输出文件开头
cat > "$OUTPUT_FILE" << 'EOL'
-- Oracle兼容设置
SET DEFINE OFF;
SET VERIFY OFF;
SET FEEDBACK OFF;
SET SERVEROUTPUT ON;

BEGIN
    EXECUTE IMMEDIATE 'PURGE RECYCLEBIN';
END;
/

EOL

# 将处理后的内容追加到输出文件
cat "$TMP_FILE" >> "$OUTPUT_FILE"

# 添加提交事务到输出文件结尾
echo "\n-- 提交事务\nCOMMIT;" >> "$OUTPUT_FILE"

# 清理临时文件
rm -f "$TMP_FILE"

echo "转换完成! Oracle兼容的SQL已保存到: $OUTPUT_FILE"

>> Home

51ak

2025/05/29

Categories: mysql 常用脚本 Tags: 基础

《数据库工作笔记》公众号
扫描上面的二维码,关注我的《数据库工作笔记》公众号