在企业数据运维与测试环境搭建中,经常需要进行数据库表级别的数据迁移。本文将分享一个在DB2数据库(同实例同库)环境下,使用Python实现从表A到表B的轻量级数据迁移脚本方案。该方案涵盖了从环境准备、核心工具类封装、迁移主脚本编写到实际运行与问题排查的全流程,适用于同库或跨库表迁移、轻量ETL(抽取与加载)以及测试环境数据快速同步等场景。
环境准备
在开始编写脚本前,需要确保以下环境与依赖已就绪。
系统与软件版本
- 操作系统:CentOS 7.x (x86_64)
- DB2版本:11.5 (已安装DB2客户端)
- Python版本:3.7
- Python依赖包:
ibm_db, ibm_db_sa, sqlalchemy==1.3.24, pandas==0.24.2
前置依赖安装
1. 安装DB2客户端
在CentOS 7系统上安装轻量版DB2客户端驱动,主要步骤如下:
# 1. 下载DB2 CLI驱动包并解压
mkdir -p /opt/ibm/db2_driver
tar -zxvf db2_cli_driver.tar.gz -C /opt/ibm/db2_driver
# 2. 配置环境变量(临时生效,永久生效需写入 ~/.bashrc)
export DB2CLI_DRIVER_PATH=/opt/ibm/db2_driver/lib64
export LD_LIBRARY_PATH=$DB2CLI_DRIVER_PATH:$LD_LIBRARY_PATH
2. 安装Python依赖
通过pip安装必要的Python库。
pip3 install ibm_db ibm_db_sa sqlalchemy==1.3.24 pandas==0.24.2
核心工具类封装:db_utils.py
为了提升代码复用性和解决DB2连接中的编码、版本兼容性问题,我们首先封装一个数据库工具类。关键点在于连接字符串中添加CODEPAGE参数(如1386对应GBK,1208对应UTF-8)以解决中文乱码问题。
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
数据库工具类:适配 DB2/MPP 数据库连接、查询与关闭
兼容 Python 3.7 + pandas 0.24.x + 旧版本 SQLAlchemy
"""
import ibm_db
import ibm_db_sa
import sqlalchemy
import pandas as pd
from pandas.io.sql import DatabaseError # 适配旧版本 pandas 异常
import traceback
import sys
def getConnectionDb2(host, port, database, username, password):
"""
创建 DB2/MPP 数据库连接(返回 SQLAlchemy 引擎)
:param host: 数据库主机地址
:param port: 数据库端口
:param database: 数据库名
:param username: 用户名
:param password: 密码
:return: SQLAlchemy 引擎对象
:raises: Exception 连接失败时抛出异常
"""
try:
# 构建 DB2 连接字符串,添加编码设置
conn_str = f"db2+ibm_db://{username}:{password}@{host}:{port}/{database};CODEPAGE=1386"
# 创建SQLAlchemy引擎
engine = sqlalchemy.create_engine(
conn_str,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
echo=False
)
# 测试连接
db_conn = engine.raw_connection()
if db_conn:
db_conn.close()
print(f" ✅ 数据库连接测试成功:{host}:{port}/{database}")
return engine
except sqlalchemy.exc.SQLAlchemyError as e:
error_msg = f"SQLAlchemy 连接失败: {str(e)}"
raise Exception(error_msg)
except ibm_db.Error as e:
error_msg = f"DB2 驱动连接失败: {ibm_db.conn_errormsg()} | {str(e)}"
raise Exception(error_msg)
except Exception as e:
error_msg = f"未知连接错误: {str(e)}"
raise Exception(error_msg)
def query(conn, sql):
"""
执行 SQL 查询并返回 pandas DataFrame(适配旧版本)
:param conn: SQLAlchemy 引擎对象
:param sql: 查询 SQL 语句
:return: pandas DataFrame
:raises: Exception 查询失败时抛出异常
"""
db_conn = None
try:
print(f" 执行 SQL: {sql[:200]}..." if len(sql) > 200 else f" 执行 SQL: {sql}")
# 手动获取原生连接以适配旧版本pandas
db_conn = conn.raw_connection()
df = pd.read_sql(
sql=sql,
con=db_conn,
coerce_float=False,
index_col=None
)
return df
except DatabaseError as e:
error_msg = f"pandas 查询失败: {str(e)}"
raise Exception(error_msg)
except sqlalchemy.exc.SQLAlchemyError as e:
error_msg = f"SQL 执行失败: {str(e)}"
raise Exception(error_msg)
except Exception as e:
error_msg = f"未知查询错误: {str(e)}"
raise Exception(error_msg)
finally:
# 手动关闭原生连接,避免泄漏
if db_conn:
try:
db_conn.close()
except:
pass
def closeConnection(conn):
"""
关闭数据库连接(释放 SQLAlchemy 引擎连接池)
:param conn: SQLAlchemy 引擎对象
:return: None
"""
if conn is None:
return
try:
conn.dispose()
print(f" ✅ 数据库连接已关闭")
except Exception as e:
print(f" ⚠ 关闭连接时警告: {str(e)}", file=sys.stderr)
if __name__ == "__main__":
# 测试函数(需修改为实际配置)
test_config = {
"host": "10.1.1.38",
"port": "25010",
"database": "casb",
"username": "db2inst1",
"password": "你的密码"
}
conn = None
try:
print("开始测试数据库连接...")
conn = getConnectionDb2(**test_config)
df = query(conn, "SELECT 1 AS test_col FROM SYSIBM.SYSDUMMY1")
print(f"测试查询结果:\n{df}")
print("✅ 连接和查询测试全部成功!")
except Exception as e:
print(f"❌ 测试失败: {str(e)}")
traceback.print_exc()
finally:
closeConnection(conn)
数据迁移主脚本:transfer_mpp_to_db2.py
主脚本负责控制“源表查询→数据清洗→目标表插入”的完整流程。核心难点在于规避旧版本Pandas的schema参数冲突,并处理数据库插入兼容性。这里采用DB2原生游标进行批量插入,有效解决了相关问题。
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# 从MPP数据库抽取数据到DB2数据库
import sys
import pandas as pd
import db_utils as util
import datetime
# ============================================
# 数据库连接配置(请根据实际情况修改)
# ============================================
# 源数据库(MPP)配置
MPP_HOST = '10.1.1.38'
MPP_PORT = '25010'
MPP_DATABASE = 'casb'
MPP_USERNAME = 'db2inst1'
MPP_PASSWORD = '你的密码'
MPP_SCHEMA = 'HMF'
MPP_TABLE = 'T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_CPQ_TRANSFER'
# 目标数据库(DB2)配置
DB2_HOST = '10.1.1.38'
DB2_PORT = '25010'
DB2_DATABASE = 'casb'
DB2_USERNAME = 'db2inst1'
DB2_PASSWORD = '你的密码'
DB2_SCHEMA = 'HMF'
DB2_TABLE = 'T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_TEST1206'
def main():
"""主函数:从MPP抽取数据到DB2"""
print("=" * 60)
print("开始数据迁移:从MPP到DB2")
print("=" * 60)
# 1. 连接数据库
print("\n[1/4] 正在连接数据库...")
conn_mpp = None
conn_db2 = None
try:
conn_mpp = util.getConnectionDb2(MPP_HOST, MPP_PORT, MPP_DATABASE, MPP_USERNAME, MPP_PASSWORD)
print(f" ✓ MPP数据库连接成功")
conn_db2 = util.getConnectionDb2(DB2_HOST, DB2_PORT, DB2_DATABASE, DB2_USERNAME, DB2_PASSWORD)
print(f" ✓ DB2数据库连接成功")
except Exception as e:
print(f" ✗ 数据库连接失败: {str(e)}")
# 故障排查建议...
sys.exit(1)
# 2. 从MPP查询源数据
print(f"\n[2/4] 正在从MPP查询数据: {MPP_SCHEMA}.{MPP_TABLE}")
try:
# 尝试不同格式的表名(带引号、不带引号、全大写)以适应不同环境
query_sql = f'SELECT * FROM "{MPP_SCHEMA}"."{MPP_TABLE}"'
try:
df_source = util.query(conn_mpp, query_sql)
except Exception as e1:
query_sql = f'SELECT * FROM {MPP_SCHEMA}.{MPP_TABLE}'
try:
df_source = util.query(conn_mpp, query_sql)
except Exception as e2:
query_sql = f'SELECT * FROM {MPP_SCHEMA.upper()}.{MPP_TABLE.upper()}'
df_source = util.query(conn_mpp, query_sql)
total_rows = len(df_source)
print(f" ✓ 查询成功,共 {total_rows} 行数据")
if total_rows == 0:
print(" ⚠ 警告:源表中没有数据")
util.closeConnection(conn_mpp)
util.closeConnection(conn_db2)
return
# 数据预览
print("\n 数据预览(前5行):")
print(df_source.head().to_string())
except Exception as e:
print(f" ✗ 查询MPP数据失败: {str(e)}")
sys.exit(1)
# 3. 使用原生游标插入数据到DB2(核心,解决兼容性问题)
print(f"\n[3/4] 正在插入数据到DB2: {DB2_SCHEMA}.{DB2_TABLE}")
try:
print(" 正在插入数据...")
# 获取原生DB2连接和游标
db_conn = conn_db2.raw_connection()
cursor = db_conn.cursor()
# 生成插入SQL,字段加双引号确保大小写正确
columns = df_source.columns.tolist()
cols_str = ', '.join([f'"{col}"' for col in columns])
placeholders = ', '.join(['?' for _ in columns])
insert_sql = f'INSERT INTO "{DB2_SCHEMA}"."{DB2_TABLE}" ({cols_str}) VALUES ({placeholders})'
# 处理数据中的NaN/None,转为DB2可识别的NULL
data = []
for row in df_source.itertuples(index=False, name=None):
cleaned_row = tuple(None if pd.isna(x) else x for x in row)
data.append(cleaned_row)
# 批量执行插入
cursor.executemany(insert_sql, data)
db_conn.commit() # 提交事务
print(f" ✓ 数据插入成功,共插入 {len(data)} 行")
cursor.close()
db_conn.close()
except Exception as e:
print(f" ✗ 插入DB2数据失败: {str(e)}")
if 'db_conn' in locals() and db_conn:
db_conn.rollback()
sys.exit(1)
# 4. 验证迁移结果
print(f"\n[4/4] 正在查询DB2数据验证: {DB2_SCHEMA}.{DB2_TABLE}")
try:
verify_sql = f'SELECT * FROM "{DB2_SCHEMA}"."{DB2_TABLE}" FETCH FIRST 10 ROWS ONLY'
try:
df_result = util.query(conn_db2, verify_sql)
except Exception as e1:
verify_sql = f'SELECT * FROM {DB2_SCHEMA}.{DB2_TABLE} FETCH FIRST 10 ROWS ONLY'
df_result = util.query(conn_db2, verify_sql)
result_rows = len(df_result)
print(f" ✓ 查询成功,共 {result_rows} 行数据")
print("\n" + "=" * 60)
print("查询结果(前10行):")
print("=" * 60)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
print(df_result.to_string(index=False))
print("=" * 60)
except Exception as e:
print(f" ✗ 查询DB2数据失败: {str(e)}")
# 关闭连接
print("\n正在关闭数据库连接...")
util.closeConnection(conn_mpp)
util.closeConnection(conn_db2)
print(" ✓ 所有连接已关闭")
print("\n" + "=" * 60)
print("数据迁移完成!")
print("=" * 60)
if __name__ == '__main__':
start_time = datetime.datetime.now()
print(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
main()
except Exception as e:
print(f"\n程序执行出错: {str(e)}")
sys.exit(1)
end_time = datetime.datetime.now()
elapsed = (end_time - start_time).total_seconds()
print(f"\n结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"总耗时: {elapsed:.2f} 秒")
sys.exit(0)
实际运行与验证
运行步骤
- 加载DB2环境变量:运行脚本前必须配置好客户端环境。
source /home/db2inst1/sqllib/db2profile
export LD_LIBRARY_PATH=/opt/ibm/db2/V11.5/lib64:$LD_LIBRARY_PATH
- 执行迁移脚本:
cd /root/pytest
python3 transfer_mpp_to_db2.py
验证迁移结果
脚本运行完成后,可以直接登录DB2命令行验证数据是否已成功写入目标表。
-- 连接数据库并查询数据量
db2 connect to casb
db2 "SELECT COUNT(*) FROM HMF.T_DWD_FACT_ZZPQ_PHASE5_CP_DETAIL_TEST1206"
也可以使用任何数据库管理工具执行查询进行验证。下图展示了一个典型的SQL查询结果界面:

通过查询确认目标表的数据记录条数,是验证迁移是否成功的最直接方式。
常见问题与解决方案
在实际运行中,可能会遇到以下典型问题,请参照下表进行排查:
| 问题现象 |
可能原因 |
解决方案 |
ModuleNotFoundError: No module named 'db_utils' |
Python路径问题 |
确保db_utils.py与主脚本在同一目录下。 |
SQL0204N: 表不存在 |
表名或模式名大小写错误 |
使用db2 list tables for schema <SCHEMA_NAME>命令确认表名,并在SQL中正确使用引号。 |
schema参数冲突导致插入失败 |
旧版本Pandas的to_sql方法兼容性问题 |
采用本文提供的“原生游标插入”方案,绕过schema参数。 |
| 查询或插入数据出现中文乱码 |
数据库连接编码不匹配 |
在db_utils.py的连接字符串中明确添加CODEPAGE参数,如;CODEPAGE=1386(GBK)或;CODEPAGE=1208(UTF-8)。 |
方案总结与扩展
本文提供的Python脚本方案核心优势在于其轻量、可复用和强兼容性。它不仅适用于“同实例同库”的表迁移,只需简单修改源和目标数据库的连接配置,即可轻松扩展为跨DB2实例或跨数据库的迁移任务。通过封装原生游标插入逻辑和编码适配,有效解决了特定环境下Python旧版本库与DB2的兼容性挑战。
可选的优化方向
对于生产环境或更复杂的场景,可以考虑以下优化点以使脚本更健壮:
- 配置抽离:将数据库连接参数、表名等信息抽离到独立的配置文件(如
config.py或config.ini)中,便于管理和批量修改,避免硬编码。
- 增强日志记录:使用Python的
logging模块,将程序运行状态、错误信息同时输出到控制台和日志文件,方便事后审计与排查问题。
- 支持大数据量分批插入:在插入数据环节,将大的数据集分割成多个批次(例如每批1000行)进行
executemany操作,可以避免单次操作内存占用过高,并允许在失败时从断点恢复。