在量化策略开发中,稳定可靠的数据源是关键。当你常用的数据接口(例如akshare)出现不稳定或报错时,券商提供的量化终端QMT及其Python接口xquant是一个值得尝试的替代方案。本文将详细介绍如何从零开始,配置并使用QMT的xquant接口来获取股票历史行情数据。
一、 环境准备与配置
使用QMT的Python接口前,需要完成一些基础的准备工作。
- 开通权限与安装软件:首先,你需要联系开户的证券公司,申请开通QMT交易权限。开通后,券商会通过电子邮箱发送软件下载地址。请下载并安装QMT客户端软件。
-
下载Python库:启动QMT软件后,在“我的”页面找到并点击“下载Python库”。这会弹出一个“系统设置”窗口。

在“系统设置”窗口中,找到“Python库下载”区域,点击“下载Python库”按钮。这会下载QMT Python接口的核心模块。
- 配置本地Python环境:下载完成后,在QMT的安装目录下(通常为
.../bin.x64/lib/site-packages/)找到名为xtquant的文件夹。将这个文件夹复制到你本地Python环境的site-packages目录下(例如 C:\Users\YourName\AppData\Local\Programs\Python\Python39\Lib\site-packages\)。如果你的电脑尚未安装Python,需要先安装一个Python环境(推荐3.7及以上版本)。
二、 核心接口简介
配置好环境后,就可以开始使用xquant了。其主要模块有两个,官方文档(https://dict.thinktrader.net/nativeApi/start_now.html)是重要的参考资料。
xtdata (行情模块):提供精简直接的行情数据,包括历史和实时的K线、分笔数据,以及财务数据、合约基础信息、板块行业分类等通用行情数据。
Xttrader (交易模块):封装了策略交易所需的Python API,可以与MiniQMT客户端交互,执行报单、撤单、查询资产、委托、成交、持仓等操作,并能接收相关变动的推送消息。
本文主要聚焦于使用xtdata模块获取行情数据。
三、 实战:获取历史行情数据并保存为CSV
一个典型的数据获取流程是:先下载数据到QMT本地,再读取并转换为通用格式(如CSV)。下面是一个完整的示例代码,它演示了如何批量下载沪深A股的日线数据并保存。
登录QMT后,请确保在交易设置中勾选了“独立交易”模式。
核心步骤说明:
- 使用
download_history_data 函数将指定股票、指定周期的数据下载到QMT本地数据目录。
- 使用
get_local_data 函数读取已下载到本地的数据。
- 将读取的数据用
Pandas处理并保存为CSV文件。
以下是实现这一过程的Python类:
import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
from xtquant import xtdata
# 添加项目根目录到Python路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))
class BaseDataSync:
"""
基础数据同步接口
封装历史行情数据下载功能
"""
def __init__(self, data_storage_path=None):
"""
初始化基础数据同步接口
:param data_storage_path: 数据存储路径
"""
# 默认数据存储路径
if not data_storage_path:
self.data_storage_path = Path(__file__).resolve().parent.parent.parent / "data"
else:
self.data_storage_path = Path(data_storage_path)
# 基础数据存储路径
self.base_data_path = self.data_storage_path / "data_storage" / "base_data"
# 确保目录存在
self.base_data_path.mkdir(parents=True, exist_ok=True)
# 日志文件路径
self.log_path = self.data_storage_path.parent / "logs"
self.log_path.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_path / f"data_sync_{datetime.now().strftime('%Y%m%d')}.log"
def log(self, message, level="INFO"):
"""
记录日志
:param message: 日志消息
:param level: 日志级别
"""
log_message = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {message}"
print(log_message)
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message + '\n')
def download_history_data(self, stocks=None, start_date=None, end_date=None, period='1d'):
"""
下载历史行情数据到本地MiniQMT数据目录,并通过get_market_data_ex读取保存为CSV文件
:param stocks: 股票列表
:param start_date: 开始日期
:param end_date: 结束日期
:param period: 数据周期,如 '1d'(日线)、'1m'(1分钟线)等
:return: 下载结果
"""
try:
self.log(f"开始下载历史行情数据,周期: {period}")
# 默认日期范围
if not end_date:
end_date = datetime.now().strftime('%Y%m%d')
if not start_date:
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d')
# 默认股票列表
if not stocks:
# 获取沪深A股股票列表
stocks = xtdata.get_stock_list_in_sector('沪深A股') # 获取所有沪深A股股票
if not stocks:
self.log("获取股票列表失败", "ERROR")
return False
# 下载每只股票的历史数据
success_count = 0
fail_count = 0
for stock in stocks:
try:
self.log(f"正在下载 {stock} 的历史数据")
# 使用download_history_data下载数据
# 移除不支持的关键字参数
xtdata.download_history_data(
stock,
period,
start_date,
end_date
)
success_count += 1
self.log(f"成功下载 {stock} 的历史数据")
except Exception as e:
fail_count += 1
self.log(f"下载 {stock} 的历史数据失败: {str(e)}", "WARNING")
self.log(f"历史行情数据下载完成,成功: {success_count}, 失败: {fail_count}")
# 通过get_local_data读取数据并保存为CSV文件
if success_count > 0:
self.log("开始通过get_local_data读取数据并保存为CSV文件")
try:
# 定义需要的字段
fields = [] # 空列表表示获取所有字段
# 使用get_local_data获取数据
self.log(f"使用get_local_data获取 {len(stocks)} 只股票的数据")
try:
# 为每只股票单独获取数据
all_history_data = []
for stock in stocks:
try:
# 使用get_local_data获取本地数据
data = xtdata.get_local_data(
field_list=fields, # 字段列表,空表示所有字段
stock_list=[stock], # 股票列表(正确的参数名称)
period=period, # 周期
count=1 # 只获取最近1条数据
)
if data and stock in data:
# 转换为DataFrame
df_stock = pd.DataFrame(data[stock])
if not df_stock.empty:
# 添加股票代码和交易日期
df_stock['ts_code'] = stock
df_stock['trade_date'] = end_date
# 确保所有必要字段存在
required_fields = ['open', 'high', 'low', 'close', 'volume', 'amount', 'pre_close']
for field in required_fields:
if field not in df_stock.columns:
df_stock[field] = 0
# 只保留需要的列
df_stock = df_stock[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pre_close']]
# 添加到总数据中
all_history_data.append(df_stock)
self.log(f"成功获取 {stock} 的本地数据")
except Exception as e:
self.log(f"获取 {stock} 的本地数据失败: {str(e)}", "WARNING")
if all_history_data:
# 合并所有股票的数据
df = pd.concat(all_history_data, ignore_index=True)
# 生成文件名
file_name = f"history_data_{period}_{end_date}.csv"
output_file = self.base_data_path / file_name
# 保存为CSV文件
try:
# 尝试删除旧文件(如果存在)
if output_file.exists():
output_file.unlink()
# 保存新文件
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"历史行情数据保存成功,文件路径: {output_file}")
self.log(f"共保存 {len(df)} 条记录")
except Exception as e:
# 如果删除旧文件失败,使用时间戳生成新文件名
timestamp = datetime.now().strftime('%H%M%S')
file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
output_file = self.base_data_path / file_name
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"使用新文件名保存历史行情数据,文件路径: {output_file}")
self.log(f"共保存 {len(df)} 条记录")
else:
self.log("获取数据失败,无法保存为CSV文件", "WARNING")
# 即使获取数据失败,也生成一个包含股票代码的CSV文件
history_data = []
for stock in stocks:
history_data.append({
'ts_code': stock,
'trade_date': end_date,
'open': 0,
'high': 0,
'low': 0,
'close': 0,
'volume': 0,
'amount': 0,
'pre_close': 0
})
df = pd.DataFrame(history_data)
file_name = f"history_data_{period}_{end_date}.csv"
output_file = self.base_data_path / file_name
try:
# 尝试删除旧文件(如果存在)
if output_file.exists():
output_file.unlink()
# 保存新文件
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"生成了包含股票代码的CSV文件,文件路径: {output_file}")
except Exception as e:
# 如果删除旧文件失败,使用时间戳生成新文件名
timestamp = datetime.now().strftime('%H%M%S')
file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
output_file = self.base_data_path / file_name
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"使用新文件名生成了包含股票代码的CSV文件,文件路径: {output_file}")
except Exception as e:
self.log(f"调用get_local_data失败: {str(e)}", "WARNING")
# 即使API调用失败,也生成一个包含股票代码的CSV文件
history_data = []
for stock in stocks:
history_data.append({
'ts_code': stock,
'trade_date': end_date,
'open': 0,
'high': 0,
'low': 0,
'close': 0,
'volume': 0,
'amount': 0,
'pre_close': 0
})
df = pd.DataFrame(history_data)
file_name = f"history_data_{period}_{end_date}.csv"
output_file = self.base_data_path / file_name
try:
# 尝试删除旧文件(如果存在)
if output_file.exists():
output_file.unlink()
# 保存新文件
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"生成了包含股票代码的CSV文件,文件路径: {output_file}")
except Exception as e:
# 如果删除旧文件失败,使用时间戳生成新文件名
timestamp = datetime.now().strftime('%H%M%S')
file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
output_file = self.base_data_path / file_name
df.to_csv(output_file, index=False, encoding='utf-8-sig')
self.log(f"使用新文件名生成了包含股票代码的CSV文件,文件路径: {output_file}")
except Exception as e:
self.log(f"保存历史行情数据为CSV文件失败: {str(e)}", "WARNING")
return success_count > 0
except Exception as e:
self.log(f"下载历史行情数据失败: {str(e)}", "ERROR")
return False
if __name__ == "__main__":
# 测试历史数据下载,使用系统当前日期
today = datetime.now().strftime('%Y%m%d')
sync = BaseDataSync()
result = sync.download_history_data(
start_date=today,
end_date=today,
period="1d"
)
print(f"下载结果: {result}")
运行上述代码后,你将在指定的数据目录中得到一个CSV文件,其中包含了所下载股票的日线行情数据。

成功跑通核心的历史数据接口后,你就可以怀着轻松愉快的心情,参照官方文档并借助AI辅助,逐个探索和测试xtdata模块的其他丰富功能了。例如:
- 获取交易日历
- 获取股票基本信息(列表、名称等)
- 获取板块信息及板块成分股
- 获取指数信息及其成分股
- 获取财务数据
四、 关于实时行情
需要注意的是,QMT默认提供的是Level-1行情数据。如果你需要更高频、信息更详细的Level-2行情(如十档买卖盘、逐笔成交等),通常需要向券商单独申请开通相关权限,这可能涉及额外的费用。
希望这篇教程能帮助你顺利打通QMT的数据获取链路。在云栈社区的Python板块,你还可以找到更多关于数据处理和量化策略开发的讨论与资源。