找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1618

积分

0

好友

206

主题
发表于 2026-2-12 01:41:54 | 查看: 35| 回复: 0

在量化策略开发中,稳定可靠的数据源是关键。当你常用的数据接口(例如akshare)出现不稳定或报错时,券商提供的量化终端QMT及其Python接口xquant是一个值得尝试的替代方案。本文将详细介绍如何从零开始,配置并使用QMT的xquant接口来获取股票历史行情数据。

一、 环境准备与配置

使用QMT的Python接口前,需要完成一些基础的准备工作。

  1. 开通权限与安装软件:首先,你需要联系开户的证券公司,申请开通QMT交易权限。开通后,券商会通过电子邮箱发送软件下载地址。请下载并安装QMT客户端软件。
  2. 下载Python库:启动QMT软件后,在“我的”页面找到并点击“下载Python库”。这会弹出一个“系统设置”窗口。

    QMT系统设置界面与Python库下载配置

    在“系统设置”窗口中,找到“Python库下载”区域,点击“下载Python库”按钮。这会下载QMT Python接口的核心模块。

  3. 配置本地Python环境:下载完成后,在QMT的安装目录下(通常为.../bin.x64/lib/site-packages/)找到名为xtquant的文件夹。将这个文件夹复制到你本地Python环境的site-packages目录下(例如 C:\Users\YourName\AppData\Local\Programs\Python\Python39\Lib\site-packages\)。如果你的电脑尚未安装Python,需要先安装一个Python环境(推荐3.7及以上版本)。

二、 核心接口简介

配置好环境后,就可以开始使用xquant了。其主要模块有两个,官方文档(https://dict.thinktrader.net/nativeApi/start_now.html)是重要的参考资料。

  • xtdata (行情模块):提供精简直接的行情数据,包括历史和实时的K线、分笔数据,以及财务数据、合约基础信息、板块行业分类等通用行情数据。
  • Xttrader (交易模块):封装了策略交易所需的Python API,可以与MiniQMT客户端交互,执行报单、撤单、查询资产、委托、成交、持仓等操作,并能接收相关变动的推送消息。

本文主要聚焦于使用xtdata模块获取行情数据。

三、 实战:获取历史行情数据并保存为CSV

一个典型的数据获取流程是:先下载数据到QMT本地,再读取并转换为通用格式(如CSV)。下面是一个完整的示例代码,它演示了如何批量下载沪深A股的日线数据并保存。

登录QMT后,请确保在交易设置中勾选了“独立交易”模式。

核心步骤说明

  1. 使用 download_history_data 函数将指定股票、指定周期的数据下载到QMT本地数据目录。
  2. 使用 get_local_data 函数读取已下载到本地的数据。
  3. 将读取的数据用Pandas处理并保存为CSV文件。

以下是实现这一过程的Python类:

import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
from xtquant import xtdata

# 添加项目根目录到Python路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))

class BaseDataSync:
    """
    基础数据同步接口
    封装历史行情数据下载功能
    """

    def __init__(self, data_storage_path=None):
        """
        初始化基础数据同步接口
        :param data_storage_path: 数据存储路径
        """
        # 默认数据存储路径
        if not data_storage_path:
            self.data_storage_path = Path(__file__).resolve().parent.parent.parent / "data"
        else:
            self.data_storage_path = Path(data_storage_path)

        # 基础数据存储路径
        self.base_data_path = self.data_storage_path / "data_storage" / "base_data"

        # 确保目录存在
        self.base_data_path.mkdir(parents=True, exist_ok=True)

        # 日志文件路径
        self.log_path = self.data_storage_path.parent / "logs"
        self.log_path.mkdir(parents=True, exist_ok=True)
        self.log_file = self.log_path / f"data_sync_{datetime.now().strftime('%Y%m%d')}.log"

    def log(self, message, level="INFO"):
        """
        记录日志
        :param message: 日志消息
        :param level: 日志级别
        """
        log_message = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {message}"
        print(log_message)
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(log_message + '\n')

    def download_history_data(self, stocks=None, start_date=None, end_date=None, period='1d'):
        """
        下载历史行情数据到本地MiniQMT数据目录,并通过get_market_data_ex读取保存为CSV文件
        :param stocks: 股票列表
        :param start_date: 开始日期
        :param end_date: 结束日期
        :param period: 数据周期,如 '1d'(日线)、'1m'(1分钟线)等
        :return: 下载结果
        """
        try:
            self.log(f"开始下载历史行情数据,周期: {period}")

            # 默认日期范围
            if not end_date:
                end_date = datetime.now().strftime('%Y%m%d')
            if not start_date:
                start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d')

            # 默认股票列表
            if not stocks:
                # 获取沪深A股股票列表
                stocks = xtdata.get_stock_list_in_sector('沪深A股')  # 获取所有沪深A股股票

            if not stocks:
                self.log("获取股票列表失败", "ERROR")
                return False

            # 下载每只股票的历史数据
            success_count = 0
            fail_count = 0

            for stock in stocks:
                try:
                    self.log(f"正在下载 {stock} 的历史数据")
                    # 使用download_history_data下载数据
                    # 移除不支持的关键字参数
                    xtdata.download_history_data(
                        stock,
                        period,
                        start_date,
                        end_date
                    )
                    success_count += 1
                    self.log(f"成功下载 {stock} 的历史数据")
                except Exception as e:
                    fail_count += 1
                    self.log(f"下载 {stock} 的历史数据失败: {str(e)}", "WARNING")

            self.log(f"历史行情数据下载完成,成功: {success_count}, 失败: {fail_count}")

            # 通过get_local_data读取数据并保存为CSV文件
            if success_count > 0:
                self.log("开始通过get_local_data读取数据并保存为CSV文件")
                try:
                    # 定义需要的字段
                    fields = []  # 空列表表示获取所有字段

                    # 使用get_local_data获取数据
                    self.log(f"使用get_local_data获取 {len(stocks)} 只股票的数据")
                    try:
                        # 为每只股票单独获取数据
                        all_history_data = []

                        for stock in stocks:
                            try:
                                # 使用get_local_data获取本地数据
                                data = xtdata.get_local_data(
                                    field_list=fields,  # 字段列表,空表示所有字段
                                    stock_list=[stock],  # 股票列表(正确的参数名称)
                                    period=period,  # 周期
                                    count=1  # 只获取最近1条数据
                                )

                                if data and stock in data:
                                    # 转换为DataFrame
                                    df_stock = pd.DataFrame(data[stock])

                                    if not df_stock.empty:
                                        # 添加股票代码和交易日期
                                        df_stock['ts_code'] = stock
                                        df_stock['trade_date'] = end_date

                                        # 确保所有必要字段存在
                                        required_fields = ['open', 'high', 'low', 'close', 'volume', 'amount', 'pre_close']
                                        for field in required_fields:
                                            if field not in df_stock.columns:
                                                df_stock[field] = 0

                                        # 只保留需要的列
                                        df_stock = df_stock[['ts_code', 'trade_date', 'open', 'high', 'low', 'close', 'volume', 'amount', 'pre_close']]

                                        # 添加到总数据中
                                        all_history_data.append(df_stock)
                                        self.log(f"成功获取 {stock} 的本地数据")
                            except Exception as e:
                                self.log(f"获取 {stock} 的本地数据失败: {str(e)}", "WARNING")

                        if all_history_data:
                            # 合并所有股票的数据
                            df = pd.concat(all_history_data, ignore_index=True)

                            # 生成文件名
                            file_name = f"history_data_{period}_{end_date}.csv"
                            output_file = self.base_data_path / file_name

                            # 保存为CSV文件
                            try:
                                # 尝试删除旧文件(如果存在)
                                if output_file.exists():
                                    output_file.unlink()
                                # 保存新文件
                                df.to_csv(output_file, index=False, encoding='utf-8-sig')
                                self.log(f"历史行情数据保存成功,文件路径: {output_file}")
                                self.log(f"共保存 {len(df)} 条记录")
                            except Exception as e:
                                # 如果删除旧文件失败,使用时间戳生成新文件名
                                timestamp = datetime.now().strftime('%H%M%S')
                                file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
                                output_file = self.base_data_path / file_name
                                df.to_csv(output_file, index=False, encoding='utf-8-sig')
                                self.log(f"使用新文件名保存历史行情数据,文件路径: {output_file}")
                                self.log(f"共保存 {len(df)} 条记录")
                        else:
                            self.log("获取数据失败,无法保存为CSV文件", "WARNING")
                            # 即使获取数据失败,也生成一个包含股票代码的CSV文件
                            history_data = []
                            for stock in stocks:
                                history_data.append({
                                    'ts_code': stock,
                                    'trade_date': end_date,
                                    'open': 0,
                                    'high': 0,
                                    'low': 0,
                                    'close': 0,
                                    'volume': 0,
                                    'amount': 0,
                                    'pre_close': 0
                                })
                            df = pd.DataFrame(history_data)
                            file_name = f"history_data_{period}_{end_date}.csv"
                            output_file = self.base_data_path / file_name
                            try:
                                # 尝试删除旧文件(如果存在)
                                if output_file.exists():
                                    output_file.unlink()
                                # 保存新文件
                                df.to_csv(output_file, index=False, encoding='utf-8-sig')
                                self.log(f"生成了包含股票代码的CSV文件,文件路径: {output_file}")
                            except Exception as e:
                                # 如果删除旧文件失败,使用时间戳生成新文件名
                                timestamp = datetime.now().strftime('%H%M%S')
                                file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
                                output_file = self.base_data_path / file_name
                                df.to_csv(output_file, index=False, encoding='utf-8-sig')
                                self.log(f"使用新文件名生成了包含股票代码的CSV文件,文件路径: {output_file}")
                    except Exception as e:
                        self.log(f"调用get_local_data失败: {str(e)}", "WARNING")
                        # 即使API调用失败,也生成一个包含股票代码的CSV文件
                        history_data = []
                        for stock in stocks:
                            history_data.append({
                                'ts_code': stock,
                                'trade_date': end_date,
                                'open': 0,
                                'high': 0,
                                'low': 0,
                                'close': 0,
                                'volume': 0,
                                'amount': 0,
                                'pre_close': 0
                            })
                        df = pd.DataFrame(history_data)
                        file_name = f"history_data_{period}_{end_date}.csv"
                        output_file = self.base_data_path / file_name
                        try:
                            # 尝试删除旧文件(如果存在)
                            if output_file.exists():
                                output_file.unlink()
                            # 保存新文件
                            df.to_csv(output_file, index=False, encoding='utf-8-sig')
                            self.log(f"生成了包含股票代码的CSV文件,文件路径: {output_file}")
                        except Exception as e:
                            # 如果删除旧文件失败,使用时间戳生成新文件名
                            timestamp = datetime.now().strftime('%H%M%S')
                            file_name = f"history_data_{period}_{end_date}_{timestamp}.csv"
                            output_file = self.base_data_path / file_name
                            df.to_csv(output_file, index=False, encoding='utf-8-sig')
                            self.log(f"使用新文件名生成了包含股票代码的CSV文件,文件路径: {output_file}")
                except Exception as e:
                    self.log(f"保存历史行情数据为CSV文件失败: {str(e)}", "WARNING")

            return success_count > 0
        except Exception as e:
            self.log(f"下载历史行情数据失败: {str(e)}", "ERROR")
            return False

if __name__ == "__main__":
    # 测试历史数据下载,使用系统当前日期
    today = datetime.now().strftime('%Y%m%d')
    sync = BaseDataSync()
    result = sync.download_history_data(
        start_date=today,
        end_date=today,
        period="1d"
    )
    print(f"下载结果: {result}")

运行上述代码后,你将在指定的数据目录中得到一个CSV文件,其中包含了所下载股票的日线行情数据。

使用QMT xquant接口导出的股票历史行情数据CSV表格

成功跑通核心的历史数据接口后,你就可以怀着轻松愉快的心情,参照官方文档并借助AI辅助,逐个探索和测试xtdata模块的其他丰富功能了。例如:

  • 获取交易日历
  • 获取股票基本信息(列表、名称等)
  • 获取板块信息及板块成分股
  • 获取指数信息及其成分股
  • 获取财务数据

四、 关于实时行情

需要注意的是,QMT默认提供的是Level-1行情数据。如果你需要更高频、信息更详细的Level-2行情(如十档买卖盘、逐笔成交等),通常需要向券商单独申请开通相关权限,这可能涉及额外的费用。

希望这篇教程能帮助你顺利打通QMT的数据获取链路。在云栈社区Python板块,你还可以找到更多关于数据处理和量化策略开发的讨论与资源。




上一篇:工控软件必备清单:从PLC编程到办公协作,一文汇总安装指南
下一篇:AI编程实战:大模型选型指南,像组建团队一样配置你的AI编码工具
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 14:19 , Processed in 0.801358 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表