找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3522

积分

0

好友

496

主题
发表于 18 小时前 | 查看: 1| 回复: 0

在之前的技术分享中,我们探讨了一些特定形态的识别算法。然而,这些方法往往因其针对性,仅对特殊形态有效。因此,设计一款更具普适性的波段识别算法,就成为了一个值得深入研究的课题。

技术分析建立在三大基本假设之上:市场包容消化一切、价格以趋势方式演变、历史会重演。波段形态识别的核心,正是基于这些假设——通过分析历史走势,寻找相似形态,总结其规律(概率),并在此基础上对未来行情进行预测。

你一定听说过“波浪理论”,也常听到“千人千浪”的说法。从本质上讲,这套理论是为市场走势寻找一种解释框架。当然,其中也包含一些明确的结论,本文将应用的一个关键结论是(以上涨波浪为例):

  • 1、3、5浪的高点依次上升。
  • 2、4浪的低点也依次上升。

基于此,一个可行的策略思路是:识别出第1浪后,在第3浪的起始阶段入场,待价格上涨超过第1浪高点后,在回调某个固定比例时平仓离场。如下图所示:

Z字形波段识别策略目标示意图

该策略的目标在于,在点“1”时,需要完成以下四步:

  1. 形态识别:有效识别当前走势形态,并在历史数据中找出所有类似的形态。
  2. 计算概率:基于上一步找出的相似形态,计算未来价格涨幅能超过点 a 的概率。
  3. 构建策略买点:若上述概率超过预设阈值,则在点1处买入标的。
  4. 构建策略卖点:若后期价格涨过点 a,则在回撤某个阈值时平仓;若价格未能涨过 a 点,则在从高点回撤超过止损阈值时离场。

综上所述,本文旨在提供一种能够有效识别波段形态的算法。在此算法框架下,我们可以量化当前形态在历史上演变为上涨或下跌趋势的概率,并据此构建择时或选股策略。

本文的灵感来源于券商研究报告,并进行了算法改进与代码实现。由于内容较多,将分为上、中、下三篇:

  • 上篇:传统方法简介与新方法的数据预处理(即本篇)。
  • 中篇:形态识别新方法的原理、测试与部分代码实现。
  • 下篇:基于识别结果的策略构建与回测。

基本知识

什么是波段

在波浪理论与道氏理论中,股价走势最基本的构成单元就是“Z”字形波浪。所有复杂的波浪形态,都是由这些基础的Z字形波段连接而成。因此,本文的核心目标就是识别这些Z字形波段。

下面是一些由Z字形波构成的经典技术形态:

  1. 头肩顶形态
    头肩顶形态示意图
    显然是由两个Z字形态构成。
  2. M头形态
    M头形态示意图
    由多个Z字形态构成。
  3. 5浪上涨形态
    5浪上涨形态示意图
    显然也是由Z字形波段构成。

其他形态不一一列举,相信你已经理解了“波段”的基本概念。

识别方法

技术分析中有一个普遍认同的观点:价格在空间上的变化,其重要性远大于时间上的变化。因此,在设计识别算法时,我们应更侧重于价格变化本身。

传统识别方法

目前最常见的相似性匹配策略,大多采用固定时间窗口,然后在历史数据中进行滑动匹配。

传统算法步骤

  1. 将价格数据进行归一化或标准化处理。
  2. 在固定长度的序列空间中计算欧氏距离,或计算两者之间的相关系数。

传统方法的局限性

  1. 窗口固定:固定的时间窗口限制了价格在空间上的变化幅度,导致基于欧氏距离的识别效果往往不理想。
  2. 时间错位:两个走势相似的波段,其高低点出现的时间位置可能不同,这会使欧氏距离计算失效,导致无法识别出本应相似的图形。
  3. 忽略相对位置:在技术分析中,Z字形波浪中各高低点的相对位置(例如高点是否依次抬高)至关重要,它直接反映了多空力量的对比。但传统欧氏距离方法并未对此做出明确判断。

接下来,我们将介绍一种能够有效解决上述问题的新识别方法。

新方法概述

新方法的整体流程如下图所示,参考并改进了券商研报的思路:

波段识别新方法流程图

整个流程可以看作一个完整的 数据处理 管道,其目的是从嘈杂的原始价格中提取出清晰的波段拓扑结构。

第一步:布林带归一化处理

与传统识别技术直接对收盘价进行标准化不同,新方法采用布林带(Bollinger Bands)的上下轨对原始收盘价进行归一化处理,计算所谓的“%b指标”。

归一化的优点
相较于股票原始价格,%b指标具备以下优势:

  • 趋势同步:指标趋势与价格变化同步,价格下跌时,%b同步下降,无延迟。
  • 数值标准化:%b指标值通常标准化在-1至2之间,便于制定通用的数据预处理流程,有效减少股价波动的噪音干扰。
  • 敏感放大:%b指标对股票波动有放大作用,能更敏感地检测波动,缩短关键点位确认的滞后时间。

计算方法
%b指标的计算公式为:%b = (收盘价 - 布林带下轨) / (布林带上轨 - 布林带下轨)

结果展示
原始价格序列经过布林带归一化处理后,得到%b指标序列,如下图所示:
股价布林带归一化后%b指标图

第二步:确定高低点(ZIG分割)

经过第一步转换后,我们仍然无法清晰识别出符合条件的波峰和波谷。这就需要引入一个关键技术:ZIG分割

可能有些读者认为ZIG函数涉及“未来函数”,但请注意,本文所述方法完全可以通过时间序列的逐步推进来剔除未来信息。请放心,所有算法和结论均不存在未来函数问题。

1. 具体计算逻辑
如下图所示:
ZIG分割算法示意图
假设当前点为点3,我们以点3为起始点。若后续点位上涨,则向后移动,扩大观察窗口。点7和点8高度相同,我们取后一点(点8)作为高点。点9和点10均低于先前高点,当运行到点10时,该点到最高点(点8)的回撤距离超过了预设阈值(δ₁),因此我们确认点10为该窗口的终点,并在此过程中确定了高点(点8)。

同理,点8将成为下一个分割段的起始点,开始新一轮的划分。最终,我们得到了一系列Z字形分割段。图中,点8到点10是为了确认一个潜在高点所需的时间窗口。经过完整的ZIG分割后,效果如下图:
完整ZIG分割效果图

2. 数学公式
给定当前窗口起始点 t_s,则该窗口的极值点 t_e(高点或低点)需满足:

  1. P(t_e) 是当前窗口内的最大值(对于上涨段)或最小值(对于下跌段)。
  2. t_e 到窗口内后续某一点 t 的回撤(或反弹)幅度 |P(t) - P(t_e)| ≥ δ₁
  3. t_e 是满足条件1和2的最后一个点。

3. 效果展示
对%b指标序列进行ZIG分割后,得到的效果如下图所示:
ZIG分割后的%b指标转折点图

第三步:指标线的裁剪

即使经过ZIG分割,结果中仍可能存在一些幅度较小的“噪音”波动。因此,我们需要引入第二个阈值 δ₂ 进行裁剪。

裁剪规则:如下图所示,在上涨趋势中,若点4的高度高于点2,且点3相较于点2的跌幅小于阈值 δ₂,那么我们认为上涨趋势未被破坏,因此删除中间的点2和点3,直接将点1与点4连接。
ZIG裁剪规则示意图

下跌趋势中的处理逻辑与此类似。经过这一步裁剪后,得到的形态线条更加清晰平滑:
裁剪后的%b指标波段图

第四步:原始价格数据映射与裁剪

我们将上一步裁剪后得到的%b指标转折点,通过其索引位置映射回原始价格序列,得到对应的价格转折点。
映射回原始价格的转折点图

如图所示,映射后的原始价格转折点序列可能仍包含一些噪声。为了进一步净化信号,我们基于价格本身的幅度,再次应用类似的裁剪逻辑,最终得到清晰的波段结构:
最终裁剪后的清晰价格波段图

此时,价格的上升、下降波段一目了然。

流程总结

整个数据处理与波段提取的流程可总结为以下示意图:
波段识别完整数据处理流程图

核心代码实现

以下是用 Python 实现上述数据预处理流程的核心代码,包括布林带计算、ZIG分割和裁剪函数。

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from typing import List, Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

# ====================== 参数配置 ======================
BB_WINDOW = 20  # 布林带窗口
BB_STD = 2      # 标准差倍数
ZIG_THRESHOLD = 0.005  # ZIG分割阈值δ₁ (0.5%)
TRIM_THRESHOLD = 0.003  # 裁剪阈值δ₂ (0.3%)
SIMILARITY_GAMMA = 0.1  # 相似距离阈值γ
PREDICT_EPSILON = 0.005 # 趋势判断阈值ε
STOP_LOSS = 0.005      # 止损比例
LOOKBACK_WINDOW = 4    # 匹配波段长度(包含几个端点)

# ====================== 数据加载 ======================
def load_data(file_path: str) -> pd.DataFrame:
    """加载本地CSV文件,需包含日期、开盘、最高、最低、收盘"""
    df = pd.read_csv(file_path, parse_dates=['date'])
    df.sort_values('date', inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

# ====================== 技术指标 ======================
def compute_bollinger_bands(close: pd.Series, window: int = BB_WINDOW, std: int = BB_STD) -> pd.DataFrame:
    """计算布林带及%b指标"""
    ma = close.rolling(window).mean()
    std_ = close.rolling(window).std()
    upper = ma + std * std_
    lower = ma - std * std_
    b = (close - lower) / (upper - lower)  # %b指标
    return pd.DataFrame({'close': close, 'ma': ma, 'upper': upper, 'lower': lower, 'b': b})

# ====================== ZIG分割与裁剪 ======================
def zig_segment(series: pd.Series, threshold: float) -> List[Tuple[int, float, str]]:
    """
    ZIG分割:识别转折点
    返回列表[(索引, 值, 类型)],类型为'peak'或'trough'
    """
    points = []
    if len(series) < 2:
        return points

    # 初始方向:由前两个点决定
    idx0, val0 = 0, series.iloc[0]
    idx1, val1 = 1, series.iloc[1]
    direction = 1 if val1 > val0 else -1  # 1上涨,-1下跌
    last_peak_trough_idx = idx0 if direction == 1 else idx1  # 暂存极值点索引

    for i in range(2, len(series)):
        val = series.iloc[i]
        if direction == 1:  # 当前在上涨段,寻找新的高点
            if val > series.iloc[last_peak_trough_idx]:
                last_peak_trough_idx = i
            elif val <= series.iloc[last_peak_trough_idx] - threshold:
                # 从高点回撤超过阈值,确认高点
                points.append((last_peak_trough_idx, series.iloc[last_peak_trough_idx], 'peak'))
                direction = -1
                last_peak_trough_idx = i
        else:  # 当前在下跌段,寻找新的低点
            if val < series.iloc[last_peak_trough_idx]:
                last_peak_trough_idx = i
            elif val >= series.iloc[last_peak_trough_idx] + threshold:
                # 从低点反弹超过阈值,确认低点
                points.append((last_peak_trough_idx, series.iloc[last_peak_trough_idx], 'trough'))
                direction = 1
                last_peak_trough_idx = i
    # 最后一个点作为终点(类型根据方向推断)
    if direction == 1:
        points.append((len(series)-1, series.iloc[-1], 'peak'))  # 若最后方向向上,终点为peak
    else:
        points.append((len(series)-1, series.iloc[-1], 'trough'))
    return points

def trim_zig(points: List[Tuple[int, float, str]], threshold: float) -> List[Tuple[int, float, str]]:
    """
    裁剪:合并幅度小于threshold的相邻波动
    算法:循环检查相邻点幅度,若小于阈值则删除中间点
    """
    if len(points) < 3:
        return points
    trimmed = points[:]
    changed = True
    while changed:
        changed = False
        i = 1
        while i < len(trimmed)-1:
            prev = trimmed[i-1]
            curr = trimmed[i]
            nxt = trimmed[i+1]
            # 计算当前波动幅度
            amp = abs(curr[1] - prev[1])
            if amp < threshold:
                # 删除当前点,连接前后
                del trimmed[i]
                changed = True
                continue
            i += 1
    return trimmed

def get_price_waves(close: pd.Series, b: pd.Series, zig_thresh: float, trim_thresh: float) -> pd.DataFrame:
    """
    完整数据处理流程:
    1. 对%b做ZIG分割
    2. 裁剪%b的ZIG结果
    3. 将裁剪后的转折点索引映射回原始价格
    4. 对原始价格再裁剪
    返回带转折点标记的价格序列
    """
    # 对%b分割
    b_points = zig_segment(b, zig_thresh)
    # 裁剪%b转折点
    b_points_trimmed = trim_zig(b_points, trim_thresh)
    # 提取索引
    idx_b = [p[0] for p in b_points_trimmed]
    # 构造原始价格序列的候选转折点(直接取对应索引的价格)
    price_points = [(idx, close.iloc[idx], p[2]) for idx, p in zip(idx_b, b_points_trimmed)]
    # 对原始价格再裁剪(由于映射后可能仍有小幅波动)
    price_points_trimmed = trim_zig(price_points, trim_thresh * close.mean())  # 阈值转换为绝对价格
    return pd.DataFrame(price_points_trimmed, columns=['idx', 'price', 'type'])

总结

本文作为波段识别系列的上篇,主要介绍了传统识别方法的局限性,并重点阐述了一种新方法的数据预处理全流程。该流程通过布林带归一化、ZIG分割和多级裁剪,有效地从原始股价数据中剔除了噪音,提取出了能够清晰反映价格运行本质的波段结构,为后续的形态相似性匹配与策略构建打下了坚实的基础。在接下来的中篇,我们将深入讲解基于此预处理结果的形态识别新算法。欢迎在云栈社区交流更多关于 技术分析与策略 实现的想法。




上一篇:当技术组长发现我的代码全是AI生成的:关于编程基本功的反思
下一篇:QMD向量搜索GPU加速实战:版本匹配与编译,性能提升6倍
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-25 19:37 , Processed in 0.353199 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表