找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

520

积分

0

好友

74

主题
发表于 19 小时前 | 查看: 1| 回复: 0

技术背景

传统的指数增强方法,通常是在目标股票池(如行业、指数成分股)中,依据一系列因子筛选出排名靠前的个股。其核心流程可归纳为:单因子计算、因子预处理、因子加权合成、因子评价、最终形成策略。行业内普遍关注于如何挖掘和合成更多、更有效的“因子”。

本文从一个不同的视角出发,提出一种另类的指数增强选股方法。该方法的核心思想是:以基准指数的行业分布为约束,在保持指数整体行业暴露不变的前提下,在全市场范围内进行多因子选股,用以替换原指数中对应行业的成分股。 这本质上是一种行业中性的增强思路。

需要注意:本文方法主要适用于宽基指数(如沪深300)。对于行业指数,理论上可以拓展为保持其他风格因子(如市值、估值)暴露不变,再进行类似操作。

原理解析

通常认为,综合指数的涨跌主要受其成分行业的走势影响,而非单一个股。例如,上证50指数受金融行业(银行、券商、保险)影响显著;创业板指数则与新能源、半导体、通信等行业高度相关。

基于此认知,指数增强可以分解为:指数 -> 行业 -> 个股 -> 行业增强 -> 指数增强。换言之,如果我们能在每个行业内选出表现有望超越行业平均的个股,并将它们按原行业权重重新组合,便可在控制行业风险暴露的同时,力求实现超越基准的超额收益。

行业中性指数增强原理图

具体步骤

整个策略的实施可分为四个核心步骤:

  1. 计算基准行业暴露:获取基准指数(例如沪深300)的成分股及其行业分类,计算各行业在指数中的权重。此权重将作为后续组合构建的行业配置依据。
  2. 挑选行业内增强个股:针对每个行业,计算其内部所有股票(或全市场属于该行业的股票)的综合因子得分。选取每个行业内排名前10%的股票作为增强组合的备选股。这一步是策略超额收益的关键来源,其本质是多因子选股模型在行业内的应用。
  3. 计算行业内个股权重:为了简化模型,通常对选定行业内的个股采用等权配置。当然,也可以根据市值、因子得分或其他规则进行加权,以满足不同的风险收益偏好。
  4. 确定行业间权重:各行业的总资金配比,严格遵循第一步计算出的基准指数行业权重,以确保组合与基准在行业层面保持中性。

Python代码实战

以下提供一个完整的Python代码框架,实现了上述行业中性指数增强策略,包含数据模拟、因子计算、选股、权重分配和回测模块。你可以根据实际的数据接口和因子库进行替换和扩展。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

class IndustryNeutralEnhancedIndex:
    """
    行业中性指数增强策略
    1. 在每个行业中选择综合因子前10%的股票
    2. 行业权重与基准指数一致
    """

    def __init__(self, benchmark='沪深300', start_date='2020-01-01', end_date='2023-12-31',
                 top_percent=0.1, rebalance_freq='M'):
        """
        初始化策略

        参数:
        ----------
        benchmark : str
            基准指数名称
        start_date : str
            开始日期
        end_date : str
            结束日期
        top_percent : float
            选择前百分之多少的股票
        rebalance_freq : str
            调仓频率,M=月,Q=季
        """
        self.benchmark = benchmark
        self.start_date = start_date
        self.end_date = end_date
        self.top_percent = top_percent
        self.rebalance_freq = rebalance_freq

        # 存储回测结果
        self.portfolio_returns = None
        self.positions_history = {}
        self.benchmark_returns = None

    def load_data(self):
        """
        加载数据(这里使用模拟数据,实际使用时替换为真实数据接口)
        需要的数据包括:
        1. 股票价格数据
        2. 财务数据
        3. 行业分类数据
        4. 基准指数成分和权重
        """
        print("加载数据...")

        # 创建模拟数据
        dates = pd.date_range(self.start_date, self.end_date, freq='D')
        n_stocks = 500
        n_days = len(dates)

        # 模拟价格数据
        np.random.seed(42)
        base_prices = np.random.uniform(5, 100, n_stocks)

        # 生成随机收益率序列
        daily_returns = np.random.normal(0.0005, 0.02, (n_days, n_stocks))

        # 累积得到价格序列
        price_data = pd.DataFrame(
            np.exp(np.cumsum(daily_returns, axis=0)) * base_prices,
            index=dates,
            columns=[f'Stock_{i:03d}' for i in range(n_stocks)]
        )

        # 模拟行业分类(假设有10个行业)
        industries = [f'Industry_{i}' for i in range(1, 11)]
        self.industry_map = pd.Series(
            np.random.choice(industries, n_stocks),
            index=price_data.columns
        )

        # 模拟基准指数权重
        benchmark_weights = np.random.dirichlet(np.ones(n_stocks), 1).flatten()
        self.benchmark_weights = pd.Series(
            benchmark_weights,
            index=price_data.columns
        )

        # 模拟财务数据(用于因子计算)
        self.factors_data = {}

        # 价值因子(市盈率倒数)
        pe_ratio = np.random.uniform(5, 30, n_stocks)
        self.factors_data['value'] = pd.Series(1 / pe_ratio, index=price_data.columns)

        # 质量因子(ROE)
        roe = np.random.normal(0.1, 0.05, n_stocks)
        self.factors_data['quality'] = pd.Series(roe, index=price_data.columns)

        # 动量因子(过去60天收益率)
        returns_60d = price_data.pct_change(60).iloc[-1]
        self.factors_data['momentum'] = returns_60d

        # 波动率因子
        volatility = price_data.pct_change().rolling(20).std().iloc[-1]
        self.factors_data['low_vol'] = -volatility  # 低波动为负,取负使得数值越大越好

        self.price_data = price_data
        print(f"数据加载完成: {len(dates)}个交易日, {n_stocks}只股票")

    def calculate_composite_factor(self, date):
        """
        计算综合因子

        参数:
        ----------
        date : datetime
            计算日期

        返回:
        ----------
        pd.Series
            每只股票的综合因子得分
        """
        # 获取计算日期的因子数据
        # 注意:实际中需要根据date获取最新的因子数据
        # 这里使用模拟的静态因子数据

        # 计算因子Z-score
        factor_scores = pd.DataFrame(index=self.price_data.columns)

        for factor_name, factor_values in self.factors_data.items():
            # 去极值和标准化
            factor_zscore = self._winsorize_and_standardize(factor_values)
            factor_scores[factor_name] = factor_zscore

        # 因子权重(可以优化)
        weights = {
            'value': 0.3,
            'quality': 0.3,
            'momentum': 0.2,
            'low_vol': 0.2
        }

        # 计算综合因子
        composite_factor = pd.Series(0.0, index=factor_scores.index)
        for factor, weight in weights.items():
            if factor in factor_scores.columns:
                composite_factor += factor_scores[factor] * weight

        return composite_factor

    def _winsorize_and_standardize(self, series, lower=0.01, upper=0.99):
        """
        去极值和标准化
        """
        # 去极值
        lower_bound = series.quantile(lower)
        upper_bound = series.quantile(upper)
        winsorized = series.clip(lower_bound, upper_bound)

        # 标准化
        standardized = (winsorized - winsorized.mean()) / winsorized.std()

        return standardized

    def get_industry_weights_from_benchmark(self):
        """
        从基准指数获取行业权重
        """
        # 汇总各行业权重
        industry_weights = pd.Series(0.0, index=self.industry_map.unique())

        for stock, weight in self.benchmark_weights.items():
            industry = self.industry_map[stock]
            industry_weights[industry] += weight

        return industry_weights / industry_weights.sum()

    def select_stocks_within_industry(self, composite_factor, date):
        """
        在每个行业内选择前top_percent的股票

        参数:
        ----------
        composite_factor : pd.Series
            综合因子得分
        date : datetime
            选股日期

        返回:
        ----------
        dict
            各行业选中的股票列表
        """
        # 将因子得分与行业信息结合
        factor_df = pd.DataFrame({
            'factor_score': composite_factor,
            'industry': self.industry_map
        })

        selected_stocks = {}

        for industry in factor_df['industry'].unique():
            industry_stocks = factor_df[factor_df['industry'] == industry]

            if len(industry_stocks) == 0:
                continue

            # 按因子得分排序
            industry_stocks = industry_stocks.sort_values('factor_score', ascending=False)

            # 计算选择的股票数量(至少1只)
            n_select = max(1, int(np.ceil(len(industry_stocks) * self.top_percent)))

            # 选择前n_select只
            selected = industry_stocks.head(n_select).index.tolist()
            selected_stocks[industry] = selected

            # 可选:记录因子得分和排名
            if f"selection_{date.date()}" not in self.positions_history:
                self.positions_history[f"selection_{date.date()}"] = {}

            self.positions_history[f"selection_{date.date()}"][industry] = {
                'stocks': selected,
                'scores': industry_stocks.head(n_select)['factor_score'].values.tolist()
            }

        return selected_stocks

    def calculate_portfolio_weights(self, selected_stocks_by_industry, date):
        """
        计算组合权重

        参数:
        ----------
        selected_stocks_by_industry : dict
            各行业选中的股票
        date : datetime
            权重计算日期

        返回:
        ----------
        pd.Series
            每只股票的权重
        """
        # 获取基准行业权重
        industry_weights = self.get_industry_weights_from_benchmark()

        # 初始化权重
        portfolio_weights = pd.Series(0.0, index=self.price_data.columns)

        for industry, stocks in selected_stocks_by_industry.items():
            if industry not in industry_weights.index or len(stocks) == 0:
                continue

            # 行业总权重
            industry_weight = industry_weights[industry]

            # 行业内等权分配
            weight_per_stock = industry_weight / len(stocks)

            for stock in stocks:
                portfolio_weights[stock] = weight_per_stock

        # 归一化(处理四舍五入误差)
        portfolio_weights = portfolio_weights / portfolio_weights.sum()

        return portfolio_weights

    def get_rebalance_dates(self):
        """
        获取调仓日期
        """
        all_dates = self.price_data.index

        if self.rebalance_freq == 'M':
            # 每月调仓(月底)
            rebalance_dates = []
            for date in all_dates:
                if date.month != (date + pd.Timedelta(days=1)).month:
                    rebalance_dates.append(date)
        elif self.rebalance_freq == 'Q':
            # 每季度调仓
            quarter_ends = [3, 6, 9, 12]
            rebalance_dates = []
            for date in all_dates:
                if date.month in quarter_ends and date.month != (date + pd.Timedelta(days=1)).month:
                    rebalance_dates.append(date)
        else:
            # 默认为每月
            rebalance_dates = all_dates[::22]  # 大约每月

        return rebalance_dates

    def backtest(self, initial_capital=1000000, transaction_cost=0.001):
        """
        回测策略

        参数:
        ----------
        initial_capital : float
            初始资金
        transaction_cost : float
            交易成本(单边)

        返回:
        ----------
        dict
            回测结果
        """
        print("开始回测...")

        # 获取调仓日期
        rebalance_dates = self.get_rebalance_dates()
        print(f"回测期间: {self.start_date} 到 {self.end_date}")
        print(f"调仓频率: {self.rebalance_freq}, 共{len(rebalance_dates)}次调仓")

        # 初始化
        capital = initial_capital
        portfolio_value = [capital]
        dates_record = [self.price_data.index[0]]

        # 当前持仓权重
        current_weights = pd.Series(0.0, index=self.price_data.columns)

        # 存储每次调仓信息
        rebalance_info = {}

        for i, rebalance_date in enumerate(rebalance_dates):
            if rebalance_date >= self.price_data.index[-1]:
                break

            # 获取调仓日期的价格
            rebalance_idx = self.price_data.index.get_loc(rebalance_date)

            # 计算综合因子
            composite_factor = self.calculate_composite_factor(rebalance_date)

            # 选择股票
            selected_stocks = self.select_stocks_within_industry(composite_factor, rebalance_date)

            # 计算目标权重
            target_weights = self.calculate_portfolio_weights(selected_stocks, rebalance_date)

            # 记录调仓信息
            rebalance_info[rebalance_date] = {
                'target_weights': target_weights.copy(),
                'selected_stocks': selected_stocks.copy(),
                'n_stocks': target_weights[target_weights > 0].count()
            }

            # 计算调仓成本
            weight_change = target_weights - current_weights
            turnover = weight_change.abs().sum() / 2
            cost = capital * turnover * transaction_cost

            # 更新资金(扣除交易成本)
            capital -= cost

            # 更新持仓
            current_weights = target_weights.copy()

            # 计算到下个调仓日(或结束日)的收益
            if i < len(rebalance_dates) - 1:
                next_rebalance = rebalance_dates[i + 1]
            else:
                next_rebalance = self.price_data.index[-1]

            # 获取期间价格数据
            period_prices = self.price_data.loc[rebalance_date:next_rebalance]

            # 计算每日收益
            for j in range(1, len(period_prices)):
                date = period_prices.index[j]
                prev_date = period_prices.index[j-1]

                # 计算组合收益率
                returns = (period_prices.loc[date] / period_prices.loc[prev_date] - 1)
                daily_return = (current_weights * returns).sum()

                # 更新资金
                capital *= (1 + daily_return)

                # 记录
                portfolio_value.append(capital)
                dates_record.append(date)

        # 计算基准收益
        if self.benchmark_weights is not None:
            # 计算基准收益(等权基准)
            benchmark_values = []
            for date in dates_record:
                if date in self.price_data.index:
                    # 简单假设基准等权持有所有股票
                    returns = (self.price_data.loc[date] / self.price_data.iloc[0] - 1).mean()
                    benchmark_value = initial_capital * (1 + returns)
                    benchmark_values.append(benchmark_value)
                else:
                    benchmark_values.append(np.nan)

            # 填充缺失值
            benchmark_values = pd.Series(benchmark_values, index=dates_record).fillna(method='ffill')
            self.benchmark_returns = benchmark_values.pct_change().fillna(0)

        # 计算组合收益
        portfolio_series = pd.Series(portfolio_value, index=dates_record)
        self.portfolio_returns = portfolio_series.pct_change().fillna(0)
        self.portfolio_values = portfolio_series

        # 计算回测指标
        results = self.calculate_performance_metrics(portfolio_series, benchmark_values)
        results['rebalance_info'] = rebalance_info

        print("回测完成!")
        return results

    def calculate_performance_metrics(self, portfolio_values, benchmark_values):
        """
        计算回测绩效指标
        """
        # 计算收益率
        total_return = portfolio_values.iloc[-1] / portfolio_values.iloc[0] - 1
        annual_return = (1 + total_return) ** (252 / len(portfolio_values)) - 1

        # 计算波动率
        returns = portfolio_values.pct_change().dropna()
        annual_vol = returns.std() * np.sqrt(252)

        # 夏普比率
        risk_free_rate = 0.03
        sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol > 0 else 0

        # 最大回撤
        cumulative = (1 + returns).cumprod()
        peak = cumulative.expanding(min_periods=1).max()
        drawdown = (cumulative - peak) / peak
        max_drawdown = drawdown.min()

        # 如果有基准,计算超额收益
        if benchmark_values is not None:
            benchmark_returns = benchmark_values.pct_change().dropna()
            excess_returns = returns.reindex(benchmark_returns.index) - benchmark_returns

            # 信息比率
            tracking_error = excess_returns.std() * np.sqrt(252)
            information_ratio = excess_returns.mean() * 252 / tracking_error if tracking_error > 0 else 0

            # 胜率
            win_rate = (excess_returns > 0).mean()

            metrics = {
                '总收益率': total_return,
                '年化收益率': annual_return,
                '年化波动率': annual_vol,
                '夏普比率': sharpe_ratio,
                '最大回撤': max_drawdown,
                '信息比率': information_ratio,
                '超额收益胜率': win_rate,
                '跟踪误差': tracking_error
            }
        else:
            metrics = {
                '总收益率': total_return,
                '年化收益率': annual_return,
                '年化波动率': annual_vol,
                '夏普比率': sharpe_ratio,
                '最大回撤': max_drawdown
            }

        return metrics

    def plot_results(self):
        """
        绘制回测结果
        """
        if self.portfolio_values is None:
            print("请先运行回测!")
            return

        fig, axes = plt.subplots(2, 2, figsize=(14, 10))

        # 1. 净值曲线
        ax1 = axes[0, 0]
        ax1.plot(self.portfolio_values.index, self.portfolio_values / self.portfolio_values.iloc[0],
                label='策略净值', linewidth=2)

        if self.benchmark_returns is not None:
            # 重建基准净值
            benchmark_values = (1 + self.benchmark_returns).cumprod()
            benchmark_values = benchmark_values * self.portfolio_values.iloc[0]
            ax1.plot(benchmark_values.index, benchmark_values / benchmark_values.iloc[0],
                    label='基准净值', linewidth=2, alpha=0.7)

        ax1.set_title('净值曲线')
        ax1.set_xlabel('日期')
        ax1.set_ylabel('净值')
        ax1.legend()
        ax1.grid(True, alpha=0.3)

        # 2. 月度收益分布
        ax2 = axes[0, 1]
        monthly_returns = self.portfolio_returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
        ax2.hist(monthly_returns * 100, bins=20, edgecolor='black', alpha=0.7)
        ax2.axvline(x=monthly_returns.mean() * 100, color='red', linestyle='--', label=f'均值: {monthly_returns.mean()*100:.2f}%')
        ax2.set_title('月度收益分布')
        ax2.set_xlabel('月度收益率(%)')
        ax2.set_ylabel('频率')
        ax2.legend()
        ax2.grid(True, alpha=0.3)

        # 3. 滚动波动率
        ax3 = axes[1, 0]
        rolling_vol = self.portfolio_returns.rolling(window=60).std() * np.sqrt(252)
        ax3.plot(rolling_vol.index, rolling_vol * 100, linewidth=2)
        ax3.axhline(y=rolling_vol.mean() * 100, color='red', linestyle='--',
                  label=f'平均: {rolling_vol.mean()*100:.2f}%')
        ax3.set_title('滚动年化波动率(60日窗口)')
        ax3.set_xlabel('日期')
        ax3.set_ylabel('波动率(%)')
        ax3.legend()
        ax3.grid(True, alpha=0.3)

        # 4. 月度收益热力图
        ax4 = axes[1, 1]
        # 创建月度收益透视表
        monthly_returns_series = self.portfolio_returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
        monthly_returns_df = pd.DataFrame({
            'Year': monthly_returns_series.index.year,
            'Month': monthly_returns_series.index.month,
            'Return': monthly_returns_series.values
        })

        # 转换为透视表
        pivot_table = monthly_returns_df.pivot(index='Year', columns='Month', values='Return')

        # 绘制热力图
        im = ax4.imshow(pivot_table * 100, cmap='RdYlGn', aspect='auto')

        # 设置坐标轴
        ax4.set_title('月度收益热力图(%)')
        ax4.set_xlabel('月份')
        ax4.set_ylabel('年份')

        # 设置刻度
        ax4.set_xticks(range(12))
        ax4.set_xticklabels(range(1, 13))

        # 添加颜色条
        plt.colorbar(im, ax=ax4)

        # 在每个单元格添加文本
        for i in range(len(pivot_table)):
            for j in range(12):
                if j in pivot_table.columns and i in pivot_table.index:
                    value = pivot_table.iloc[i, j] * 100 if not pd.isna(pivot_table.iloc[i, j]) else np.nan
                    if not pd.isna(value):
                        ax4.text(j, i, f'{value:.1f}', ha='center', va='center', fontsize=8)

        plt.tight_layout()
        plt.show()

    def analyze_holdings(self, date=None):
        """
        分析持仓情况

        参数:
        ----------
        date : datetime
            分析日期,默认为最近调仓日
        """
        if not self.positions_history:
            print("没有持仓历史数据!")
            return

        if date is None:
            # 使用最后一次调仓
            selection_keys = [k for k in self.positions_history.keys() if k.startswith('selection_')]
            if not selection_keys:
                return
            latest_key = sorted(selection_keys)[-1]
            holdings = self.positions_history[latest_key]
        else:
            date_key = f"selection_{date.date()}"
            if date_key not in self.positions_history:
                print(f"日期 {date} 没有持仓数据!")
                return
            holdings = self.positions_history[date_key]

        print(f"\n{'='*50}")
        print(f"持仓分析")
        print(f"{'='*50}")

        total_stocks = 0
        for industry, data in holdings.items():
            stocks = data['stocks']
            n_stocks = len(stocks)
            total_stocks += n_stocks
            print(f"\n行业: {industry}")
            print(f"  股票数量: {n_stocks}")
            print(f"  平均因子得分: {np.mean(data['scores']):.4f}")
            if len(stocks) <= 5:  # 只显示前几只
                print(f"  股票列表: {', '.join(stocks[:5])}")

        print(f"\n总计: {total_stocks} 只股票")
        print(f"{'='*50}")

# 使用示例
def main():
    # 创建策略实例
    strategy = IndustryNeutralEnhancedIndex(
        benchmark='沪深300',
        start_date='2020-01-01',
        end_date='2023-12-31',
        top_percent=0.1,  # 选择前10%
        rebalance_freq='M'  # 月度调仓
    )

    # 加载数据
    strategy.load_data()

    # 运行回测
    results = strategy.backtest(
        initial_capital=1000000,
        transaction_cost=0.001
    )

    # 打印绩效指标
    print("\n" + "="*50)
    print("回测绩效指标")
    print("="*50)
    for metric, value in results.items():
        if metric != 'rebalance_info':
            if isinstance(value, float):
                if '率' in metric or '比' in metric:
                    print(f"{metric}: {value:.4f}")
                else:
                    print(f"{metric}: {value:.2%}")

    # 绘制图表
    strategy.plot_results()

    # 分析持仓
    strategy.analyze_holdings()

    return strategy, results

if __name__ == "__main__":
    strategy, results = main()

总结与拓展

  1. 本文介绍了一种基于行业暴露控制的指数增强方法。它并非“圣杯”,实际应用中会面临数据质量、因子失效、交易成本等诸多挑战。提供的代码核心在于阐述方法步骤,供学习参考。
  2. 该方法的核心框架可以概括为:将基准指数按关键风格因子(本文是行业)拆解,在各类风格内部运用多因子选股方法挑选强势个股进行替换,最终保持组合整体的风格暴露与基准一致。
  3. 对于行业指数,此方法是否有效?理论上,可以将“行业”替换为其他你想保持中性的风格因子(如市值、估值、波动率等),然后进行类似的尝试。



上一篇:Qt GUI开发选型:Qml与QWidget性能对比与实战避坑指南
下一篇:SpringBoot+Vue3前后端分离英语学习系统实战:含完整源码与部署教程
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-8 23:51 , Processed in 0.081616 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表