找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

249

积分

0

好友

29

主题
发表于 7 天前 | 查看: 25| 回复: 0

在企业数据运维与测试环境搭建中,经常需要进行数据库表级别的数据迁移。本文将分享一个在DB2数据库(同实例同库)环境下,使用Python实现从表A到表B的轻量级数据迁移脚本方案。该方案涵盖了从环境准备、核心工具类封装、迁移主脚本编写到实际运行与问题排查的全流程,适用于同库或跨库表迁移、轻量ETL(抽取与加载)以及测试环境数据快速同步等场景。

环境准备

在开始编写脚本前,需要确保以下环境与依赖已就绪。

系统与软件版本

  • 操作系统:CentOS 7.x (x86_64)
  • DB2版本:11.5 (已安装DB2客户端)
  • Python版本:3.7
  • Python依赖包ibm_db, ibm_db_sa, sqlalchemy==1.3.24, pandas==0.24.2

前置依赖安装

1. 安装DB2客户端

在CentOS 7系统上安装轻量版DB2客户端驱动,主要步骤如下:

# 1. 下载DB2 CLI驱动包并解压
mkdir -p /opt/ibm/db2_driver
tar -zxvf db2_cli_driver.tar.gz -C /opt/ibm/db2_driver

# 2. 配置环境变量(临时生效,永久生效需写入 ~/.bashrc)
export DB2CLI_DRIVER_PATH=/opt/ibm/db2_driver/lib64
export LD_LIBRARY_PATH=$DB2CLI_DRIVER_PATH:$LD_LIBRARY_PATH
2. 安装Python依赖

通过pip安装必要的Python库。

pip3 install ibm_db ibm_db_sa sqlalchemy==1.3.24 pandas==0.24.2

核心工具类封装:db_utils.py

为了提升代码复用性和解决DB2连接中的编码、版本兼容性问题,我们首先封装一个数据库工具类。关键点在于连接字符串中添加CODEPAGE参数(如1386对应GBK,1208对应UTF-8)以解决中文乱码问题。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
数据库工具类:适配 DB2/MPP 数据库连接、查询与关闭
兼容 Python 3.7 + pandas 0.24.x + 旧版本 SQLAlchemy
"""
import ibm_db
import ibm_db_sa
import sqlalchemy
import pandas as pd
from pandas.io.sql import DatabaseError  # 适配旧版本 pandas 异常
import traceback
import sys

def getConnectionDb2(host, port, database, username, password):
    """
    创建 DB2/MPP 数据库连接(返回 SQLAlchemy 引擎)
    :param host: 数据库主机地址
    :param port: 数据库端口
    :param database: 数据库名
    :param username: 用户名
    :param password: 密码
    :return: SQLAlchemy 引擎对象
    :raises: Exception 连接失败时抛出异常
    """
    try:
        # 构建 DB2 连接字符串,添加编码设置
        conn_str = f"db2+ibm_db://{username}:{password}@{host}:{port}/{database};CODEPAGE=1386"
        # 创建SQLAlchemy引擎
        engine = sqlalchemy.create_engine(
            conn_str,
            pool_size=5,
            max_overflow=10,
            pool_recycle=3600,
            echo=False
        )
        # 测试连接
        db_conn = engine.raw_connection()
        if db_conn:
            db_conn.close()
        print(f"  ✅ 数据库连接测试成功:{host}:{port}/{database}")
        return engine
    except sqlalchemy.exc.SQLAlchemyError as e:
        error_msg = f"SQLAlchemy 连接失败: {str(e)}"
        raise Exception(error_msg)
    except ibm_db.Error as e:
        error_msg = f"DB2 驱动连接失败: {ibm_db.conn_errormsg()} | {str(e)}"
        raise Exception(error_msg)
    except Exception as e:
        error_msg = f"未知连接错误: {str(e)}"
        raise Exception(error_msg)

def query(conn, sql):
    """
    执行 SQL 查询并返回 pandas DataFrame(适配旧版本)
    :param conn: SQLAlchemy 引擎对象
    :param sql: 查询 SQL 语句
    :return: pandas DataFrame
    :raises: Exception 查询失败时抛出异常
    """
    db_conn = None
    try:
        print(f"  执行 SQL: {sql[:200]}..." if len(sql) > 200 else f"  执行 SQL: {sql}")
        # 手动获取原生连接以适配旧版本pandas
        db_conn = conn.raw_connection()
        df = pd.read_sql(
            sql=sql,
            con=db_conn,
            coerce_float=False,
            index_col=None
        )
        return df
    except DatabaseError as e:
        error_msg = f"pandas 查询失败: {str(e)}"
        raise Exception(error_msg)
    except sqlalchemy.exc.SQLAlchemyError as e:
        error_msg = f"SQL 执行失败: {str(e)}"
        raise Exception(error_msg)
    except Exception as e:
        error_msg = f"未知查询错误: {str(e)}"
        raise Exception(error_msg)
    finally:
        # 手动关闭原生连接,避免泄漏
        if db_conn:
            try:
                db_conn.close()
            except:
                pass

def closeConnection(conn):
    """
    关闭数据库连接(释放 SQLAlchemy 引擎连接池)
    :param conn: SQLAlchemy 引擎对象
    :return: None
    """
    if conn is None:
        return
    try:
        conn.dispose()
        print(f"  ✅ 数据库连接已关闭")
    except Exception as e:
        print(f"  ⚠ 关闭连接时警告: {str(e)}", file=sys.stderr)

if __name__ == "__main__":
    # 测试函数(需修改为实际配置)
    test_config = {
        "host": "10.1.1.38",
        "port": "25010",
        "database": "casb",
        "username": "db2inst1",
        "password": "你的密码"
    }
    conn = None
    try:
        print("开始测试数据库连接...")
        conn = getConnectionDb2(**test_config)
        df = query(conn, "SELECT 1 AS test_col FROM SYSIBM.SYSDUMMY1")
        print(f"测试查询结果:\n{df}")
        print("✅ 连接和查询测试全部成功!")
    except Exception as e:
        print(f"❌ 测试失败: {str(e)}")
        traceback.print_exc()
    finally:
        closeConnection(conn)

数据迁移主脚本:transfer_mpp_to_db2.py

主脚本负责控制“源表查询→数据清洗→目标表插入”的完整流程。核心难点在于规避旧版本Pandas的schema参数冲突,并处理数据库插入兼容性。这里采用DB2原生游标进行批量插入,有效解决了相关问题。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# 从MPP数据库抽取数据到DB2数据库
import sys
import pandas as pd
import db_utils as util
import datetime

# ============================================
# 数据库连接配置(请根据实际情况修改)
# ============================================
# 源数据库(MPP)配置
MPP_HOST = '10.1.1.38'
MPP_PORT = '25010'
MPP_DATABASE = 'casb'
MPP_USERNAME = 'db2inst1'
MPP_PASSWORD = '你的密码'
MPP_SCHEMA = 'HMF'
MPP_TABLE = 'T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_CPQ_TRANSFER'

# 目标数据库(DB2)配置
DB2_HOST = '10.1.1.38'
DB2_PORT = '25010'
DB2_DATABASE = 'casb'
DB2_USERNAME = 'db2inst1'
DB2_PASSWORD = '你的密码'
DB2_SCHEMA = 'HMF'
DB2_TABLE = 'T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_TEST1206'

def main():
    """主函数:从MPP抽取数据到DB2"""
    print("=" * 60)
    print("开始数据迁移:从MPP到DB2")
    print("=" * 60)

    # 1. 连接数据库
    print("\n[1/4] 正在连接数据库...")
    conn_mpp = None
    conn_db2 = None
    try:
        conn_mpp = util.getConnectionDb2(MPP_HOST, MPP_PORT, MPP_DATABASE, MPP_USERNAME, MPP_PASSWORD)
        print(f"  ✓ MPP数据库连接成功")
        conn_db2 = util.getConnectionDb2(DB2_HOST, DB2_PORT, DB2_DATABASE, DB2_USERNAME, DB2_PASSWORD)
        print(f"  ✓ DB2数据库连接成功")
    except Exception as e:
        print(f"  ✗ 数据库连接失败: {str(e)}")
        # 故障排查建议...
        sys.exit(1)

    # 2. 从MPP查询源数据
    print(f"\n[2/4] 正在从MPP查询数据: {MPP_SCHEMA}.{MPP_TABLE}")
    try:
        # 尝试不同格式的表名(带引号、不带引号、全大写)以适应不同环境
        query_sql = f'SELECT * FROM "{MPP_SCHEMA}"."{MPP_TABLE}"'
        try:
            df_source = util.query(conn_mpp, query_sql)
        except Exception as e1:
            query_sql = f'SELECT * FROM {MPP_SCHEMA}.{MPP_TABLE}'
            try:
                df_source = util.query(conn_mpp, query_sql)
            except Exception as e2:
                query_sql = f'SELECT * FROM {MPP_SCHEMA.upper()}.{MPP_TABLE.upper()}'
                df_source = util.query(conn_mpp, query_sql)

        total_rows = len(df_source)
        print(f"  ✓ 查询成功,共 {total_rows} 行数据")
        if total_rows == 0:
            print("  ⚠ 警告:源表中没有数据")
            util.closeConnection(conn_mpp)
            util.closeConnection(conn_db2)
            return
        # 数据预览
        print("\n  数据预览(前5行):")
        print(df_source.head().to_string())
    except Exception as e:
        print(f"  ✗ 查询MPP数据失败: {str(e)}")
        sys.exit(1)

    # 3. 使用原生游标插入数据到DB2(核心,解决兼容性问题)
    print(f"\n[3/4] 正在插入数据到DB2: {DB2_SCHEMA}.{DB2_TABLE}")
    try:
        print("  正在插入数据...")
        # 获取原生DB2连接和游标
        db_conn = conn_db2.raw_connection()
        cursor = db_conn.cursor()
        # 生成插入SQL,字段加双引号确保大小写正确
        columns = df_source.columns.tolist()
        cols_str = ', '.join([f'"{col}"' for col in columns])
        placeholders = ', '.join(['?' for _ in columns])
        insert_sql = f'INSERT INTO "{DB2_SCHEMA}"."{DB2_TABLE}" ({cols_str}) VALUES ({placeholders})'
        # 处理数据中的NaN/None,转为DB2可识别的NULL
        data = []
        for row in df_source.itertuples(index=False, name=None):
            cleaned_row = tuple(None if pd.isna(x) else x for x in row)
            data.append(cleaned_row)
        # 批量执行插入
        cursor.executemany(insert_sql, data)
        db_conn.commit()  # 提交事务
        print(f"  ✓ 数据插入成功,共插入 {len(data)} 行")
        cursor.close()
        db_conn.close()
    except Exception as e:
        print(f"  ✗ 插入DB2数据失败: {str(e)}")
        if 'db_conn' in locals() and db_conn:
            db_conn.rollback()
        sys.exit(1)

    # 4. 验证迁移结果
    print(f"\n[4/4] 正在查询DB2数据验证: {DB2_SCHEMA}.{DB2_TABLE}")
    try:
        verify_sql = f'SELECT * FROM "{DB2_SCHEMA}"."{DB2_TABLE}" FETCH FIRST 10 ROWS ONLY'
        try:
            df_result = util.query(conn_db2, verify_sql)
        except Exception as e1:
            verify_sql = f'SELECT * FROM {DB2_SCHEMA}.{DB2_TABLE} FETCH FIRST 10 ROWS ONLY'
            df_result = util.query(conn_db2, verify_sql)

        result_rows = len(df_result)
        print(f"  ✓ 查询成功,共 {result_rows} 行数据")
        print("\n" + "=" * 60)
        print("查询结果(前10行):")
        print("=" * 60)
        pd.set_option('display.max_columns', None)
        pd.set_option('display.width', None)
        print(df_result.to_string(index=False))
        print("=" * 60)
    except Exception as e:
        print(f"  ✗ 查询DB2数据失败: {str(e)}")

    # 关闭连接
    print("\n正在关闭数据库连接...")
    util.closeConnection(conn_mpp)
    util.closeConnection(conn_db2)
    print("  ✓ 所有连接已关闭")
    print("\n" + "=" * 60)
    print("数据迁移完成!")
    print("=" * 60)

if __name__ == '__main__':
    start_time = datetime.datetime.now()
    print(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    try:
        main()
    except Exception as e:
        print(f"\n程序执行出错: {str(e)}")
        sys.exit(1)
    end_time = datetime.datetime.now()
    elapsed = (end_time - start_time).total_seconds()
    print(f"\n结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"总耗时: {elapsed:.2f} 秒")
    sys.exit(0)

实际运行与验证

运行步骤

  1. 加载DB2环境变量:运行脚本前必须配置好客户端环境。
    source /home/db2inst1/sqllib/db2profile
    export LD_LIBRARY_PATH=/opt/ibm/db2/V11.5/lib64:$LD_LIBRARY_PATH
  2. 执行迁移脚本
    cd /root/pytest
    python3 transfer_mpp_to_db2.py

验证迁移结果

脚本运行完成后,可以直接登录DB2命令行验证数据是否已成功写入目标表。

-- 连接数据库并查询数据量
db2 connect to casb
db2 "SELECT COUNT(*) FROM HMF.T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_TEST1206"

也可以使用任何数据库管理工具执行查询进行验证。下图展示了一个典型的SQL查询结果界面:

SQL查询结果验证界面

通过查询确认目标表的数据记录条数,是验证迁移是否成功的最直接方式。

常见问题与解决方案

在实际运行中,可能会遇到以下典型问题,请参照下表进行排查:

问题现象 可能原因 解决方案
ModuleNotFoundError: No module named 'db_utils' Python路径问题 确保db_utils.py与主脚本在同一目录下。
SQL0204N: 表不存在 表名或模式名大小写错误 使用db2 list tables for schema <SCHEMA_NAME>命令确认表名,并在SQL中正确使用引号。
schema参数冲突导致插入失败 旧版本Pandas的to_sql方法兼容性问题 采用本文提供的“原生游标插入”方案,绕过schema参数。
查询或插入数据出现中文乱码 数据库连接编码不匹配 db_utils.py的连接字符串中明确添加CODEPAGE参数,如;CODEPAGE=1386(GBK)或;CODEPAGE=1208(UTF-8)。

方案总结与扩展

本文提供的Python脚本方案核心优势在于其轻量、可复用和强兼容性。它不仅适用于“同实例同库”的表迁移,只需简单修改源和目标数据库的连接配置,即可轻松扩展为跨DB2实例或跨数据库的迁移任务。通过封装原生游标插入逻辑和编码适配,有效解决了特定环境下Python旧版本库与DB2的兼容性挑战。

可选的优化方向

对于生产环境或更复杂的场景,可以考虑以下优化点以使脚本更健壮:

  1. 配置抽离:将数据库连接参数、表名等信息抽离到独立的配置文件(如config.pyconfig.ini)中,便于管理和批量修改,避免硬编码。
  2. 增强日志记录:使用Python的logging模块,将程序运行状态、错误信息同时输出到控制台和日志文件,方便事后审计与排查问题。
  3. 支持大数据量分批插入:在插入数据环节,将大的数据集分割成多个批次(例如每批1000行)进行executemany操作,可以避免单次操作内存占用过高,并允许在失败时从断点恢复。



上一篇:事件驱动架构与领域事件:微服务一致性问题与DDD解耦实践
下一篇:网络安全专业大专生求职记:从外包面试到行业三巨头的坎坷之路
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 22:25 , Processed in 0.200991 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表