找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1009

积分

0

好友

131

主题
发表于 3 天前 | 查看: 9| 回复: 0

在实时通信系统中,WebSocket错误处理从来不是简单的try-catch,而是涉及网络状态感知、恢复策略选择、用户体验保障的系统性工程。一个未处理的WebSocket异常可能导致:

• 用户操作无响应但界面“看似正常”
• 关键数据在传输中静默丢失
• 重连风暴引发服务端雪崩

本文将构建完整的WebSocket错误处理模式库,从基础重连到高级降级策略,覆盖生产环境中的所有复杂场景。

一、WebSocket连接生命周期与错误分类

1.1 连接状态机设计

清晰定义连接状态是稳健处理的基础。一个健壮的状态机可以防止非法状态转换,并为诊断提供历史依据。

// WebSocket连接状态机
class WebSocketStateMachine {
  private state: ConnectionState = 'DISCONNECTED';
  private stateHistory: StateTransition[] = [];
  // 完整的连接状态定义
  private readonly states = {
    DISCONNECTED: { from: ['CONNECTED', 'RECONNECTING', 'ERROR'] },
    CONNECTING: { from: ['DISCONNECTED', 'RECONNECTING'] },
    CONNECTED: { from: ['CONNECTING'] },
    RECONNECTING: { from: ['CONNECTED', 'ERROR'] },
    ERROR: { from: ['CONNECTING', 'CONNECTED', 'RECONNECTING'] },
    DEGRADED: { from: ['CONNECTED', 'RECONNECTING', 'ERROR'] },
    DISCONNECTING: { from: ['CONNECTED', 'DEGRADED'] }
  };

  transition(to: ConnectionState, reason?: string): void {
    const validFrom = this.states[to].from;
    if (!validFrom.includes(this.state)) {
      throw new Error(`Invalid transition: ${this.state} -> ${to}`);
    }

    const transition: StateTransition = {
      from: this.state,
      to,
      timestamp: Date.now(),
      reason
    };
    this.stateHistory.push(transition);
    this.state = to;

    // 状态变化通知
    this.notifyStateChange(transition);

    // 保持历史记录在合理范围内
    if (this.stateHistory.length > 100) {
      this.stateHistory = this.stateHistory.slice(-50);
    }
  }

  // 基于当前状态判断允许的操作
  canPerformAction(action: SocketAction): boolean {
    const actionRules: Record<SocketAction, ConnectionState[]> = {
      SEND: ['CONNECTED', 'DEGRADED'],
      RECONNECT: ['DISCONNECTED', 'ERROR'],
      CLOSE: ['CONNECTED', 'CONNECTING', 'RECONNECTING', 'DEGRADED'],
      UPGRADE: ['CONNECTED']
    };
    return actionRules[action].includes(this.state);
  }

  // 获取状态持续时间
  getStateDuration(): number {
    if (this.stateHistory.length === 0) return 0;
    const lastTransition = this.stateHistory[this.stateHistory.length - 1];
    return Date.now() - lastTransition.timestamp;
  }

  // 诊断连接问题模式
  diagnoseConnectionPattern(): ConnectionPattern {
    const recentTransitions = this.stateHistory.slice(-10);
    // 分析频繁重连模式
    const reconnectCount = recentTransitions.filter(
      t => t.to === 'RECONNECTING'
    ).length;
    // 分析短连接模式
    const shortConnections = recentTransitions.filter(t => 
      t.from === 'CONNECTED' && 
      t.to === 'DISCONNECTED' &&
      this.getTransitionDuration(t) < 30000 // 30秒内断开
    );

    if (reconnectCount > 5) {
      return 'FREQUENT_RECONNECT';
    } else if (shortConnections.length > 3) {
      return 'SHORT_LIVED_CONNECTIONS';
    } else if (this.stateHistory.some(t => t.reason?.includes('timeout'))) {
      return 'TIMEOUT_PRONE';
    }
    return 'STABLE';
  }

  private getTransitionDuration(transition: StateTransition): number {
    const nextIndex = this.stateHistory.indexOf(transition) + 1;
    if (nextIndex < this.stateHistory.length) {
      return this.stateHistory[nextIndex].timestamp - transition.timestamp;
    }
    return Date.now() - transition.timestamp;
  }
}

1.2 错误分类体系

并非所有错误都应同等对待。建立分类体系有助于制定针对性的恢复策略。

// WebSocket错误分类器
class WebSocketErrorClassifier {
  // 错误严重程度分类
  classifyError(error: ErrorEvent | Event | any): ErrorClassification {
    const classification: ErrorClassification = {
      category: 'UNKNOWN',
      severity: 'MEDIUM',
      recoverable: true,
      retryImmediately: true,
      userNotification: false
    };

    // 网络相关错误
    if (this.isNetworkError(error)) {
      classification.category = 'NETWORK';
      classification.severity = 'LOW';
      classification.retryImmediately = false;
      classification.userNotification = false;
    }
    // 认证错误
    else if (this.isAuthenticationError(error)) {
      classification.category = 'AUTHENTICATION';
      classification.severity = 'HIGH';
      classification.recoverable = false;
      classification.retryImmediately = false;
      classification.userNotification = true;
    }
    // 服务端错误
    else if (this.isServerError(error)) {
      classification.category = 'SERVER';
      classification.severity = 'MEDIUM';
      classification.retryImmediately = false;
      classification.userNotification = true;
    }
    // 协议错误
    else if (this.isProtocolError(error)) {
      classification.category = 'PROTOCOL';
      classification.severity = 'HIGH';
      classification.recoverable = false;
      classification.userNotification = true;
    }
    // 限流错误
    else if (this.isRateLimitError(error)) {
      classification.category = 'RATE_LIMIT';
      classification.severity = 'MEDIUM';
      classification.retryImmediately = false;
      classification.userNotification = true;
    }

    return classification;
  }

  private isNetworkError(error: any): boolean {
    return (
      error?.type === 'error' ||
      error?.message?.includes('Network Error') ||
      error?.message?.includes('Failed to fetch') ||
      error?.code === 1006 || // Abnormal closure
      navigator.onLine === false
    );
  }

  private isAuthenticationError(error: any): boolean {
    return (
      error?.code === 4003 || // Custom authentication code
      error?.message?.includes('auth') ||
      error?.message?.includes('token') ||
      error?.message?.includes('unauthorized')
    );
  }

  private isServerError(error: any): boolean {
    return (
      error?.code === 1011 || // Internal error
      error?.message?.includes('server') ||
      error?.message?.includes('internal') ||
      error?.type === 'close' && [1001, 1011, 1012].includes(error.code)
    );
  }

  private isProtocolError(error: any): boolean {
    return (
      error?.code === 1002 || // Protocol error
      error?.code === 1007 || // Invalid data
      error?.message?.includes('protocol') ||
      error?.message?.includes('invalid')
    );
  }

  private isRateLimitError(error: any): boolean {
    return (
      error?.code === 1008 || // Policy violation
      error?.message?.includes('rate limit') ||
      error?.message?.includes('too many') ||
      error?.message?.includes('throttle')
    );
  }

  // 根据错误分类获取恢复策略
  getRecoveryStrategy(classification: ErrorClassification): RecoveryStrategy {
    const strategies: Record<string, RecoveryStrategy> = {
      NETWORK: {
        action: 'RETRY_WITH_BACKOFF',
        retryDelay: [1000, 3000, 10000, 30000],
        maxAttempts: 5,
        fallback: 'SWITCH_TO_LONG_POLLING'
      },
      AUTHENTICATION: {
        action: 'REAUTHENTICATE',
        retryDelay: [],
        maxAttempts: 1,
        fallback: 'DISCONNECT'
      },
      SERVER: {
        action: 'RETRY_WITH_BACKOFF',
        retryDelay: [5000, 15000, 30000, 60000],
        maxAttempts: 3,
        fallback: 'SWITCH_TO_BACKUP_SERVER'
      },
      PROTOCOL: {
        action: 'UPGRADE_PROTOCOL',
        retryDelay: [],
        maxAttempts: 1,
        fallback: 'DISCONNECT'
      },
      RATE_LIMIT: {
        action: 'WAIT_AND_RETRY',
        retryDelay: [30000, 60000, 120000],
        maxAttempts: 3,
        fallback: 'REDUCE_FREQUENCY'
      }
    };
    return strategies[classification.category] || strategies.NETWORK;
  }
}

二、智能重连策略引擎

2.1 多维度重连决策

简单的指数退避已不足以应对复杂网络环境。智能引擎需综合网络质量、历史成功率等多因素决策。

// 智能重连策略引擎
class IntelligentReconnectEngine {
  private attemptCount = 0;
  private lastAttemptTime = 0;
  private successHistory: boolean[] = [];
  private networkQualityMonitor: NetworkQualityMonitor;

  constructor(private config: ReconnectConfig) {
    this.networkQualityMonitor = new NetworkQualityMonitor();
  }

  // 决定是否应该重连
  async shouldReconnect(error: ErrorClassification): Promise<ReconnectDecision> {
    // 检查错误是否可恢复
    if (!error.recoverable) {
      return { shouldReconnect: false, reason: 'NON_RECOVERABLE_ERROR' };
    }

    // 检查最大重试次数
    if (this.attemptCount >= this.config.maxReconnectAttempts) {
      return { shouldReconnect: false, reason: 'MAX_ATTEMPTS_EXCEEDED' };
    }

    // 检查冷却期
    if (!this.isCoolDownPeriodOver()) {
      return { 
        shouldReconnect: false, 
        reason: 'IN_COOLDOWN_PERIOD',
        retryAfter: this.getRemainingCooldown()
      };
    }

    // 检查网络质量
    const networkQuality = await this.networkQualityMonitor.getQuality();
    if (networkQuality === 'POOR' && this.attemptCount > 2) {
      return { shouldReconnect: false, reason: 'NETWORK_QUALITY_TOO_POOR' };
    }

    // 检查电池状态(移动设备)
    if (await this.isLowBattery()) {
      return { shouldReconnect: false, reason: 'LOW_BATTERY' };
    }

    return { 
      shouldReconnect: true,
      delay: this.calculateOptimalDelay(error),
      strategy: this.selectReconnectStrategy()
    };
  }

  // 计算最优重连延迟
  private calculateOptimalDelay(error: ErrorClassification): number {
    const baseDelays = this.getRecoveryStrategy(error).retryDelay;
    if (baseDelays.length === 0) {
      return 0;
    }

    const baseDelay = baseDelays[Math.min(this.attemptCount, baseDelays.length - 1)];
    // 根据网络质量调整延迟
    const networkFactor = this.getNetworkQualityFactor();
    // 根据历史成功率调整延迟
    const historyFactor = this.getHistoryFactor();
    // 根据错误严重程度调整延迟
    const severityFactor = this.getSeverityFactor(error.severity);

    return baseDelay * networkFactor * historyFactor * severityFactor;
  }

  // 选择重连策略
  private selectReconnectStrategy(): ReconnectStrategy {
    const successRate = this.calculateSuccessRate();
    if (successRate > 0.8) {
      return 'AGGRESSIVE'; // 高成功率时积极重连
    } else if (successRate > 0.5) {
      return 'BALANCED';   // 中等成功率时平衡策略
    } else {
      return 'CONSERVATIVE'; // 低成功率时保守策略
    }
  }

  // 记录重连结果用于学习
  recordReconnectResult(success: boolean): void {
    this.attemptCount = success ? 0 : this.attemptCount + 1;
    this.lastAttemptTime = Date.now();

    // 维护成功历史窗口
    this.successHistory.push(success);
    if (this.successHistory.length > 20) {
      this.successHistory.shift();
    }
  }

  // 计算历史成功率
  private calculateSuccessRate(): number {
    if (this.successHistory.length === 0) return 1;
    const successCount = this.successHistory.filter(Boolean).length;
    return successCount / this.successHistory.length;
  }

  // 获取网络质量因子
  private getNetworkQualityFactor(): number {
    const quality = this.networkQualityMonitor.getCachedQuality();
    switch (quality) {
      case 'EXCELLENT': return 0.8;
      case 'GOOD': return 1;
      case 'FAIR': return 1.5;
      case 'POOR': return 2;
      default: return 1;
    }
  }

  // 获取历史因子
  private getHistoryFactor(): number {
    const successRate = this.calculateSuccessRate();
    if (successRate > 0.8) return 0.7;
    if (successRate > 0.5) return 1;
    return 1.3;
  }

  // 获取严重程度因子
  private getSeverityFactor(severity: ErrorSeverity): number {
    switch (severity) {
      case 'LOW': return 0.8;
      case 'MEDIUM': return 1;
      case 'HIGH': return 1.5;
      default: return 1;
    }
  }

  // 检查冷却期是否结束
  private isCoolDownPeriodOver(): boolean {
    if (this.lastAttemptTime === 0) return true;
    const cooldown = this.calculateCooldownPeriod();
    return Date.now() - this.lastAttemptTime >= cooldown;
  }

  // 计算冷却期
  private calculateCooldownPeriod(): number {
    const baseCooldown = Math.min(30000, 1000 * Math.pow(2, this.attemptCount));
    return Math.min(baseCooldown, 300000); // 最大5分钟
  }

  private getRemainingCooldown(): number {
    const cooldown = this.calculateCooldownPeriod();
    const elapsed = Date.now() - this.lastAttemptTime;
    return Math.max(0, cooldown - elapsed);
  }

  // 检查电池状态
  private async isLowBattery(): Promise<boolean> {
    if (!('getBattery' in navigator)) return false;
    try {
      const battery = await (navigator as any).getBattery();
      return battery.level < 0.2 && !battery.charging;
    } catch {
      return false;
    }
  }
}

2.2 指数退避与抖动优化

传统退避策略在客户端密集时可能引发“重连风暴”。引入抖动(Jitter)是分布式系统中的常见优化手段。

// 高级退避策略实现
class AdvancedBackoffStrategy {
  private baseDelay: number;
  private maxDelay: number;
  private maxAttempts: number;
  private jitterFactor: number;

  constructor(config: BackoffConfig) {
    this.baseDelay = config.baseDelay;
    this.maxDelay = config.maxDelay;
    this.maxAttempts = config.maxAttempts;
    this.jitterFactor = config.jitterFactor || 0.2;
  }

  // 计算退避延迟(带抖动)
  calculateDelay(attempt: number): number {
    if (attempt >= this.maxAttempts) {
      return this.maxDelay;
    }

    // 指数退避计算
    const exponentialDelay = this.baseDelay * Math.pow(2, attempt);
    // 应用最大延迟限制
    const cappedDelay = Math.min(exponentialDelay, this.maxDelay);
    // 添加随机抖动避免重连风暴
    const jitter = this.calculateJitter(cappedDelay);

    return cappedDelay + jitter;
  }

  // 计算抖动值
  private calculateJitter(delay: number): number {
    const jitterRange = delay * this.jitterFactor;
    return (Math.random() - 0.5) * jitterRange;
  }

  // 基于成功率的动态退避调整
  adjustBasedOnSuccessRate(successRate: number): void {
    if (successRate > 0.9) {
      // 高成功率时减少基础延迟
      this.baseDelay = Math.max(1000, this.baseDelay * 0.8);
    } else if (successRate < 0.5) {
      // 低成功率时增加基础延迟
      this.baseDelay = Math.min(30000, this.baseDelay * 1.2);
    }
  }

  // 重置退避状态
  reset(): void {
    this.baseDelay = Math.max(1000, this.baseDelay); // 确保最小延迟
  }

  // 生成退避序列
  generateBackoffSequence(): number[] {
    const sequence: number[] = [];
    for (let i = 0; i < this.maxAttempts; i++) {
      sequence.push(this.calculateDelay(i));
    }
    return sequence;
  }
}

// 网络质量监控器
class NetworkQualityMonitor {
  private latencyMeasurements: number[] = [];
  private packetLossMeasurements: number[] = [];
  private lastMeasurementTime = 0;

  async getQuality(): Promise<NetworkQuality> {
    // 避免频繁测量
    if (Date.now() - this.lastMeasurementTime < 5000) {
      return this.getCachedQuality();
    }

    const metrics = await this.measureNetworkMetrics();
    this.updateMeasurements(metrics);
    this.lastMeasurementTime = Date.now();
    return this.calculateQualityScore(metrics);
  }

  private async measureNetworkMetrics(): Promise<NetworkMetrics> {
    const latency = await this.measureLatency();
    const packetLoss = await this.estimatePacketLoss();
    return { latency, packetLoss };
  }

  private async measureLatency(): Promise<number> {
    // 简单的延迟测量:发送多个小请求计算平均延迟
    const samples = 3;
    const latencies: number[] = [];

    for (let i = 0; i < samples; i++) {
      const start = performance.now();
      try {
        await fetch('/ping', { 
          method: 'HEAD',
          cache: 'no-cache',
          signal: AbortSignal.timeout(5000)
        });
        latencies.push(performance.now() - start);
      } catch {
        latencies.push(5000); // 超时时使用最大延迟
      }
    }

    return latencies.reduce((a, b) => a + b, 0) / latencies.length;
  }

  private async estimatePacketLoss(): Promise<number> {
    // 基于WebSocket消息丢失率估计丢包率
    // 这里简化实现,实际中需要更复杂的测量
    if (this.packetLossMeasurements.length === 0) return 0;
    return this.packetLossMeasurements.reduce((a, b) => a + b, 0) / 
           this.packetLossMeasurements.length;
  }

  private calculateQualityScore(metrics: NetworkMetrics): NetworkQuality {
    const { latency, packetLoss } = metrics;
    if (latency < 100 && packetLoss < 0.01) return 'EXCELLENT';
    if (latency < 300 && packetLoss < 0.05) return 'GOOD';
    if (latency < 1000 && packetLoss < 0.1) return 'FAIR';
    return 'POOR';
  }

  getCachedQuality(): NetworkQuality {
    if (this.latencyMeasurements.length === 0) return 'GOOD';
    const avgLatency = this.latencyMeasurements.reduce((a, b) => a + b, 0) / 
                      this.latencyMeasurements.length;
    return this.calculateQualityScore({ 
      latency: avgLatency, 
      packetLoss: 0.05 // 默认值
    });
  }

  private updateMeasurements(metrics: NetworkMetrics): void {
    this.latencyMeasurements.push(metrics.latency);
    this.packetLossMeasurements.push(metrics.packetLoss);

    // 保持最近的20个测量值
    if (this.latencyMeasurements.length > 20) {
      this.latencyMeasurements.shift();
      this.packetLossMeasurements.shift();
    }
  }
}

三、心跳机制与连接健康监测

3.1 自适应心跳策略

固定频率的心跳在网络条件变化时效率低下。自适应心跳能根据网络延迟动态调整间隔,在节省资源和及时检测间取得平衡。

// 自适应心跳管理器
class AdaptiveHeartbeatManager {
  private heartbeatInterval: number = 30000; // 初始30秒
  private missedHeartbeats: number = 0;
  private lastHeartbeatTime: number = 0;
  private responseTimes: number[] = [];
  private isRunning: boolean = false;
  private timeoutId?: NodeJS.Timeout;

  constructor(
    private websocket: WebSocket,
    private config: HeartbeatConfig
  ) {}

  // 启动心跳
  start(): void {
    if (this.isRunning) return;
    this.isRunning = true;
    this.lastHeartbeatTime = Date.now();
    this.scheduleNextHeartbeat();
  }

  // 停止心跳
  stop(): void {
    this.isRunning = false;
    if (this.timeoutId) {
      clearTimeout(this.timeoutId);
    }
  }

  // 调度下一次心跳
  private scheduleNextHeartbeat(): void {
    if (!this.isRunning) return;
    this.timeoutId = setTimeout(() => {
      this.sendHeartbeat();
    }, this.heartbeatInterval);
  }

  // 发送心跳
  private async sendHeartbeat(): Promise<void> {
    if (this.websocket.readyState !== WebSocket.OPEN) {
      this.handleMissedHeartbeat();
      return;
    }

    const heartbeatId = this.generateHeartbeatId();
    const startTime = Date.now();

    try {
      // 发送心跳消息
      this.websocket.send(JSON.stringify({
        type: 'heartbeat',
        id: heartbeatId,
        timestamp: startTime
      }));

      // 设置响应超时
      const timeoutPromise = new Promise<never>((_, reject) => {
        setTimeout(() => reject(new Error('Heartbeat timeout')), this.config.timeout);
      });

      // 等待响应
      const responsePromise = this.waitForHeartbeatResponse(heartbeatId);
      await Promise.race([responsePromise, timeoutPromise]);

      const responseTime = Date.now() - startTime;
      this.recordResponseTime(responseTime);
      this.adjustHeartbeatInterval();
      this.missedHeartbeats = 0;
    } catch (error) {
      this.handleMissedHeartbeat();
    } finally {
      this.scheduleNextHeartbeat();
    }
  }

  // 等待心跳响应
  private waitForHeartbeatResponse(heartbeatId: string): Promise<void> {
    return new Promise((resolve, reject) => {
      const messageHandler = (event: MessageEvent) => {
        try {
          const data = JSON.parse(event.data);
          if (data.type === 'heartbeat_ack' && data.id === heartbeatId) {
            this.websocket.removeEventListener('message', messageHandler);
            resolve();
          }
        } catch {
          // 忽略非JSON消息或解析错误
        }
      };

      this.websocket.addEventListener('message', messageHandler);

      // 清理函数
      setTimeout(() => {
        this.websocket.removeEventListener('message', messageHandler);
        reject(new Error('Heartbeat response timeout'));
      }, this.config.timeout);
    });
  }

  // 处理心跳丢失
  private handleMissedHeartbeat(): void {
    this.missedHeartbeats++;
    if (this.missedHeartbeats >= this.config.maxMissedBeats) {
      this.triggerConnectionRecovery();
    } else {
      // 缩短心跳间隔以更快检测连接状态
      this.heartbeatInterval = Math.max(
        this.config.minInterval,
        this.heartbeatInterval * 0.7
      );
    }
  }

  // 触发连接恢复
  private triggerConnectionRecovery(): void {
    this.stop();
    // 通知连接管理器进行恢复
    this.config.onHeartbeatFailure?.({
      missedBeats: this.missedHeartbeats,
      avgResponseTime: this.getAverageResponseTime(),
      lastSuccessful: this.lastHeartbeatTime
    });
  }

  // 记录响应时间
  private recordResponseTime(responseTime: number): void {
    this.responseTimes.push(responseTime);
    // 保持最近的响应时间记录
    if (this.responseTimes.length > 10) {
      this.responseTimes.shift();
    }
    this.lastHeartbeatTime = Date.now();
  }

  // 调整心跳间隔
  private adjustHeartbeatInterval(): void {
    const avgResponseTime = this.getAverageResponseTime();
    const networkQuality = this.assessNetworkQuality();
    let newInterval: number;

    switch (networkQuality) {
      case 'EXCELLENT':
        newInterval = this.config.maxInterval; // 网络好时使用较长间隔
        break;
      case 'GOOD':
        newInterval = this.heartbeatInterval; // 保持当前间隔
        break;
      case 'FAIR':
        newInterval = Math.max(
          this.config.minInterval,
          this.heartbeatInterval * 0.8
        );
        break;
      case 'POOR':
        newInterval = this.config.minInterval; // 网络差时使用最短间隔
        break;
      default:
        newInterval = this.heartbeatInterval;
    }

    // 确保间隔在合理范围内
    this.heartbeatInterval = Math.max(
      this.config.minInterval,
      Math.min(this.config.maxInterval, newInterval)
    );
  }

  // 评估网络质量
  private assessNetworkQuality(): NetworkQuality {
    const avgResponseTime = this.getAverageResponseTime();
    const variance = this.calculateResponseTimeVariance();

    if (avgResponseTime < 100 && variance < 50) return 'EXCELLENT';
    if (avgResponseTime < 300 && variance < 100) return 'GOOD';
    if (avgResponseTime < 1000 && variance < 300) return 'FAIR';
    return 'POOR';
  }

  // 获取平均响应时间
  private getAverageResponseTime(): number {
    if (this.responseTimes.length === 0) return 0;
    return this.responseTimes.reduce((a, b) => a + b, 0) / this.responseTimes.length;
  }

  // 计算响应时间方差
  private calculateResponseTimeVariance(): number {
    if (this.responseTimes.length < 2) return 0;
    const avg = this.getAverageResponseTime();
    const variance = this.responseTimes.reduce((acc, time) => {
      return acc + Math.pow(time - avg, 2);
    }, 0) / this.responseTimes.length;
    return Math.sqrt(variance);
  }

  // 生成唯一心跳ID
  private generateHeartbeatId(): string {
    return `heartbeat_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  // 获取当前状态
  getStatus(): HeartbeatStatus {
    return {
      isRunning: this.isRunning,
      interval: this.heartbeatInterval,
      missedBeats: this.missedHeartbeats,
      avgResponseTime: this.getAverageResponseTime(),
      lastHeartbeatTime: this.lastHeartbeatTime
    };
  }
}

3.2 连接健康综合评估

单一指标无法全面反映连接健康状况。需要从连通性、稳定性、性能等多维度进行综合评分。

// 连接健康评估器
class ConnectionHealthAssessor {
  private metrics: HealthMetrics = {
    latency: [],
    packetLoss: [],
    uptime: 0,
    errorRate: 0,
    reconnects: 0
  };
  private assessmentHistory: HealthAssessment[] = [];

  // 评估连接健康度
  assessConnectionHealth(connection: WebSocketConnection): HealthScore {
    const currentMetrics = this.collectCurrentMetrics(connection);
    const historicalTrend = this.analyzeHistoricalTrend();
    const networkConditions = this.assessNetworkConditions();

    const scores = {
      connectivity: this.calculateConnectivityScore(currentMetrics),
      stability: this.calculateStabilityScore(historicalTrend),
      performance: this.calculatePerformanceScore(currentMetrics),
      reliability: this.calculateReliabilityScore(historicalTrend)
    };

    const overallScore = this.calculateOverallScore(scores, networkConditions);
    const assessment: HealthAssessment = {
      timestamp: Date.now(),
      scores,
      overallScore,
      recommendations: this.generateRecommendations(scores, overallScore),
      riskLevel: this.determineRiskLevel(overallScore)
    };

    this.assessmentHistory.push(assessment);
    this.cleanupOldAssessments();
    return overallScore;
  }

  // 计算连接性得分
  private calculateConnectivityScore(metrics: CurrentMetrics): number {
    let score = 100;

    // 基于延迟扣分
    const avgLatency = metrics.latency.length > 0 ? 
      metrics.latency.reduce((a, b) => a + b) / metrics.latency.length : 0;
    if (avgLatency > 1000) score -= 40;
    else if (avgLatency > 500) score -= 20;
    else if (avgLatency > 200) score -= 10;

    // 基于丢包率扣分
    if (metrics.packetLoss > 0.1) score -= 30;
    else if (metrics.packetLoss > 0.05) score -= 15;
    else if (metrics.packetLoss > 0.01) score -= 5;

    return Math.max(0, score);
  }

  // 计算稳定性得分
  private calculateStabilityScore(trend: HistoricalTrend): number {
    let score = 100;

    // 基于重连频率扣分
    if (trend.reconnectsPerHour > 10) score -= 50;
    else if (trend.reconnectsPerHour > 5) score -= 25;
    else if (trend.reconnectsPerHour > 2) score -= 10;

    // 基于错误率趋势扣分
    if (trend.errorRateIncreasing) score -= 20;

    return Math.max(0, score);
  }

  // 计算总体得分
  private calculateOverallScore(
    scores: HealthScores, 
    network: NetworkConditions
  ): HealthScore {
    const weights = {
      connectivity: 0.3,
      stability: 0.3,
      performance: 0.2,
      reliability: 0.2
    };

    let weightedSum = 0;
    let totalWeight = 0;

    for (const [category, score] of Object.entries(scores)) {
      weightedSum += score * weights[category as keyof typeof weights];
      totalWeight += weights[category as keyof typeof weights];
    }

    let overallScore = weightedSum / totalWeight;

    // 根据网络条件调整分数
    if (network.quality === 'POOR') {
      overallScore *= 0.8;
    } else if (network.quality === 'FAIR') {
      overallScore *= 0.9;
    }

    // 转换为健康等级
    if (overallScore >= 90) return 'EXCELLENT';
    if (overallScore >= 75) return 'GOOD';
    if (overallScore >= 60) return 'FAIR';
    return 'POOR';
  }

  // 生成改进建议
  private generateRecommendations(scores: HealthScores, overall: HealthScore): string[] {
    const recommendations: string[] = [];

    if (scores.connectivity < 70) {
      recommendations.push('考虑优化网络连接或启用压缩');
    }
    if (scores.stability < 70) {
      recommendations.push('检查重连策略,可能需要调整退避参数');
    }
    if (scores.performance < 70) {
      recommendations.push('优化消息大小和频率,减少带宽使用');
    }
    if (overall === 'POOR') {
      recommendations.push('建议切换到降级模式或备用传输方式');
    }

    return recommendations;
  }

  // 确定风险等级
  private determineRiskLevel(score: HealthScore): RiskLevel {
    switch (score) {
      case 'EXCELLENT': return 'LOW';
      case 'GOOD': return 'LOW';
      case 'FAIR': return 'MEDIUM';
      case 'POOR': return 'HIGH';
      default: return 'MEDIUM';
    }
  }

  // 清理旧的评估记录
  private cleanupOldAssessments(): void {
    const oneHourAgo = Date.now() - 3600000;
    this.assessmentHistory = this.assessmentHistory.filter(
      assessment => assessment.timestamp > oneHourAgo
    );
  }
}

四、降级方案与备用通道

4.1 多级降级策略

当WebSocket不可用时,系统应能平滑降级而非完全崩溃。设计多级降级路径,优先保障核心功能。

// 分级降级管理器
class GracefulDegradationManager {
  private currentLevel: DegradationLevel = 'FULL_FUNCTIONALITY';
  private degradationPath: DegradationPath;
  private fallbackChannels: FallbackChannel[] = [];

  constructor(private config: DegradationConfig) {
    this.degradationPath = this.buildDegradationPath();
    this.initializeFallbackChannels();
  }

  // 构建降级路径
  private buildDegradationPath(): DegradationPath {
    return {
      'FULL_FUNCTIONALITY': {
        next: 'REDUCED_FREQUENCY',
        conditions: ['HIGH_LATENCY', 'HIGH_ERROR_RATE'],
        features: ['realtime_messaging', 'large_messages', 'binary_data']
      },
      'REDUCED_FREQUENCY': {
        next: 'IMPORTANT_ONLY',
        conditions: ['PERSISTENT_HIGH_LATENCY', 'FREQUENT_DISCONNECTS'],
        features: ['realtime_messaging', 'important_messages']
      },
      'IMPORTANT_ONLY': {
        next: 'LONG_POLLING',
        conditions: ['WEBSOCKET_UNRELIABLE', 'HIGH_PACKET_LOSS'],
        features: ['important_messages', 'ack_required']
      },
      'LONG_POLLING': {
        next: 'BATCH_MODE',
        conditions: ['LONG_POLLING_UNRELIABLE'],
        features: ['important_messages', 'batched_delivery']
      },
      'BATCH_MODE': {
        next: 'OFFLINE_MODE',
        conditions: ['NO_CONNECTIVITY'],
        features: ['cached_messages', 'offline_operation']
      },
      'OFFLINE_MODE': {
        next: 'FULL_FUNCTIONALITY',
        conditions: ['CONNECTIVITY_RESTORED'],
        features: ['cached_messages', 'local_operation']
      }
    };
  }

  // 评估是否需要降级
  async evaluateDegradationNeeded(metrics: PerformanceMetrics): Promise<DegradationDecision> {
    const issues = this.identifyPerformanceIssues(metrics);
    if (issues.length === 0) {
      return this.considerUpgrade();
    }

    const recommendedLevel = this.determineRecommendedLevel(issues);
    if (this.shouldDegrade(recommendedLevel)) {
      return {
        shouldDegrade: true,
        targetLevel: recommendedLevel,
        reason: issues.join(', '),
        estimatedRecovery: this.estimateRecoveryTime(issues)
      };
    }

    return { shouldDegrade: false };
  }

  // 执行降级
  async executeDegradation(targetLevel: DegradationLevel): Promise<void> {
    console.log(`Degrading from ${this.currentLevel} to ${targetLevel}`);

    // 执行降级前的准备
    await this.prepareForDegradation(targetLevel);
    // 停用当前级别的功能
    await this.deactivateCurrentFeatures();
    // 激活目标级别的功能
    await this.activateTargetFeatures(targetLevel);

    // 切换备用通道(如果需要)
    if (this.requiresFallbackChannel(targetLevel)) {
      await this.switchToFallbackChannel(targetLevel);
    }

    this.currentLevel = targetLevel;

    // 通知降级事件
    this.notifyDegradationEvent({
      from: this.currentLevel,
      to: targetLevel,
      timestamp: Date.now(),
      automatic: true
    });
  }

  // 考虑升级恢复
  private async considerUpgrade(): Promise<DegradationDecision> {
    if (this.currentLevel === 'FULL_FUNCTIONALITY') {
      return { shouldDegrade: false };
    }

    // 检查是否满足升级条件
    const upgradeConditions = this.getUpgradeConditions();
    const canUpgrade = await this.checkUpgradeConditions(upgradeConditions);

    if (canUpgrade) {
      const nextLevel = this.getNextUpgradeLevel();
      return {
        shouldDegrade: true, // 实际上是升级
        targetLevel: nextLevel,
        reason: 'Conditions improved',
        estimatedRecovery: 0
      };
    }

    return { shouldDegrade: false };
  }

  // 获取下一个升级级别
  private getNextUpgradeLevel(): DegradationLevel {
    const path = this.degradationPath[this.currentLevel];
    // 在降级路径中反向查找
    for (const [level, config] of Object.entries(this.degradationPath)) {
      if (config.next === this.currentLevel) {
        return level as DegradationLevel;
      }
    }
    return 'FULL_FUNCTIONALITY';
  }

  // 初始化备用通道
  private initializeFallbackChannels(): void {
    this.fallbackChannels = [
      new LongPollingChannel(this.config.longPollingConfig),
      new ServerSentEventsChannel(this.config.sseConfig),
      new HTTPBatchChannel(this.config.batchConfig)
    ];
  }

  // 切换到备用通道
  private async switchToFallbackChannel(level: DegradationLevel): Promise<void> {
    const suitableChannels = this.fallbackChannels.filter(
      channel => channel.supportsLevel(level)
    );

    for (const channel of suitableChannels) {
      try {
        await channel.activate();
        console.log(`Switched to fallback channel: ${channel.name}`);
        break;
      } catch (error) {
        console.warn(`Failed to activate ${channel.name}:`, error);
        continue;
      }
    }
  }
}

// 备用通道基类
abstract class FallbackChannel {
  abstract name: string;
  abstract supportsLevel(level: DegradationLevel): boolean;
  abstract activate(): Promise<void>;
  abstract deactivate(): Promise<void>;
  abstract send(message: any): Promise<void>;
  abstract isAvailable(): boolean;
}

// 长轮询备用通道
class LongPollingChannel extends FallbackChannel {
  name = 'LongPolling';
  private isActive = false;
  private polling = false;

  supportsLevel(level: DegradationLevel): boolean {
    return ['LONG_POLLING', 'BATCH_MODE'].includes(level);
  }

  async activate(): Promise<void> {
    if (this.isActive) return;
    this.isActive = true;
    this.startPolling();
  }

  async deactivate(): Promise<void> {
    this.isActive = false;
    this.polling = false;
  }

  async send(message: any): Promise<void> {
    const response = await fetch('/api/messages', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(message)
    });

    if (!response.ok) {
      throw new Error(`HTTP request failed: ${response.status}`);
    }
  }

  private async startPolling(): Promise<void> {
    if (!this.isActive || this.polling) return;

    this.polling = true;
    while (this.isActive && this.polling) {
      try {
        const response = await fetch('/api/messages/poll', {
          signal: AbortSignal.timeout(30000)
        });

        if (response.ok) {
          const messages = await response.json();
          this.handleMessages(messages);
        }
      } catch (error) {
        console.warn('Long polling error:', error);
        await this.delay(5000);
      }
    }
  }

  isAvailable(): boolean {
    return typeof fetch !== 'undefined';
  }

  private handleMessages(messages: any[]): void {
    messages.forEach(message => {
      // 分发消息到应用层
      this.config.onMessage?.(message);
    });
  }
}

4.2 离线队列与消息持久化

在网络不稳定或完全离线时,消息不应丢失。实现本地队列与持久化存储是保障可靠性的最后防线。

// 离线消息队列
class OfflineMessageQueue {
  private queue: QueuedMessage[] = [];
  private storage: PersistentStorage;
  private isProcessing = false;

  constructor(private config: QueueConfig) {
    this.storage = new PersistentStorage('offline_messages');
    this.loadPersistedMessages();
  }

  // 添加消息到队列
  async enqueue(message: any, options: QueueOptions = {}): Promise<string> {
    const queuedMessage: QueuedMessage = {
      id: this.generateMessageId(),
      message,
      timestamp: Date.now(),
      attempts: 0,
      priority: options.priority || 'normal',
      requiresAck: options.requiresAck || false,
      expiration: options.expiration || Date.now() + 3600000, // 1小时默认过期
      status: 'pending'
    };

    this.queue.push(queuedMessage);
    // 按优先级排序
    this.sortQueueByPriority();
    // 持久化存储
    await this.persistQueue();

    // 尝试立即发送(如果在线)
    if (navigator.onLine) {
      this.processQueue();
    }

    return queuedMessage.id;
  }

  // 处理队列
  async processQueue(): Promise<void> {
    if (this.isProcessing || this.queue.length === 0) {
      return;
    }

    this.isProcessing = true;

    try {
      while (this.queue.length > 0 && navigator.onLine) {
        const message = this.queue[0];

        // 检查消息是否过期
        if (this.isMessageExpired(message)) {
          this.queue.shift();
          this.config.onExpired?.(message);
          continue;
        }

        // 检查重试次数
        if (message.attempts >= this.config.maxRetries) {
          this.queue.shift();
          this.config.onMaxRetries?.(message);
          continue;
        }

        try {
          await this.sendMessage(message);
          message.attempts++;

          // 如果发送成功且不需要确认,移除消息
          if (!message.requiresAck) {
            this.queue.shift();
            this.config.onSuccess?.(message);
          } else {
            // 需要确认的消息等待ACK
            await this.waitForAck(message);
          }
        } catch (error) {
          message.attempts++;
          message.lastError = error.message;
          // 应用退避延迟
          const delay = this.calculateRetryDelay(message.attempts);
          await this.delay(delay);
        }
      }
    } finally {
      this.isProcessing = false;
      await this.persistQueue();
    }
  }

  // 发送消息
  private async sendMessage(queuedMessage: QueuedMessage): Promise<void> {
    // 根据当前连接状态选择发送方式
    if (this.isWebSocketAvailable()) {
      await this.sendViaWebSocket(queuedMessage);
    } else if (this.isHttpAvailable()) {
      await this.sendViaHttp(queuedMessage);
    } else {
      throw new Error('No available transport');
    }
  }

  // 确认消息接收
  async acknowledgeMessage(messageId: string): Promise<void> {
    const messageIndex = this.queue.findIndex(msg => msg.id === messageId);
    if (messageIndex !== -1) {
      const message = this.queue[messageIndex];
      message.status = 'delivered';
      this.queue.splice(messageIndex, 1);
      await this.persistQueue();
      this.config.onAcknowledged?.(message);
    }
  }

  // 计算重试延迟
  private calculateRetryDelay(attempt: number): number {
    const baseDelay = this.config.retryDelays[0] || 1000;
    const maxDelay = this.config.maxRetryDelay || 30000;
    return Math.min(
      maxDelay,
      baseDelay * Math.pow(2, attempt - 1)
    );
  }

  // 排序队列
  private sortQueueByPriority(): void {
    const priorityOrder = { critical: 0, high: 1, normal: 2, low: 3 };
    this.queue.sort((a, b) => {
      const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
      if (priorityDiff !== 0) return priorityDiff;
      // 相同优先级按时间排序
      return a.timestamp - b.timestamp;
    });
  }

  // 持久化队列
  private async persistQueue(): Promise<void> {
    await this.storage.set('message_queue', this.queue);
  }

  // 加载持久化的消息
  private async loadPersistedMessages(): Promise<void> {
    const stored = await this.storage.get('message_queue');
    if (stored && Array.isArray(stored)) {
      this.queue = stored;
    }
  }

  // 生成消息ID
  private generateMessageId(): string {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  // 检查消息是否过期
  private isMessageExpired(message: QueuedMessage): boolean {
    return Date.now() > message.expiration;
  }
}

五、完整WebSocket客户端实现

5.1 健壮的WebSocket包装器

整合上述所有模块,构建一个面向生产环境的完整客户端。利用TypeScript的强类型可以大大提高代码的健壮性和可维护性。

// 健壮的WebSocket客户端
class RobustWebSocketClient {
  private ws: WebSocket | null = null;
  private stateMachine: WebSocketStateMachine;
  private reconnectEngine: IntelligentReconnectEngine;
  private heartbeatManager: AdaptiveHeartbeatManager;
  private degradationManager: GracefulDegradationManager;
  private messageQueue: OfflineMessageQueue;
  private eventListeners: Map<string, Function[]> = new Map();

  constructor(private config: WebSocketConfig) {
    this.stateMachine = new WebSocketStateMachine();
    this.reconnectEngine = new IntelligentReconnectEngine(config.reconnect);
    this.degradationManager = new GracefulDegradationManager(config.degradation);
    this.messageQueue = new OfflineMessageQueue(config.queue);
    this.initializeEventListeners();
    this.setupNetworkMonitoring();
  }

  // 连接到WebSocket服务器
  async connect(): Promise<void> {
    if (this.stateMachine.canPerformAction('RECONNECT')) {
      this.stateMachine.transition('CONNECTING', 'user_initiated');
    } else {
      throw new Error(`Cannot connect from state: ${this.stateMachine.state}`);
    }

    try {
      // 构建WebSocket URL(支持认证参数)
      const url = this.buildWebSocketUrl();
      // 创建WebSocket连接
      this.ws = new WebSocket(url, this.config.protocols);
      // 设置事件处理器
      this.setupWebSocketEventHandlers();
      // 等待连接建立
      await this.waitForConnection();

      // 启动心跳
      this.heartbeatManager = new AdaptiveHeartbeatManager(
        this.ws,
        this.config.heartbeat
      );
      this.heartbeatManager.start();

      this.stateMachine.transition('CONNECTED', 'connection_established');
    } catch (error) {
      this.stateMachine.transition('ERROR', `connection_failed: ${error.message}`);
      throw error;
    }
  }

  // 发送消息
  async send(data: any, options: SendOptions = {}): Promise<void> {
    // 检查连接状态
    if (!this.stateMachine.canPerformAction('SEND')) {
      // 如果不可发送,将消息加入队列
      if (options.queueIfOffline !== false) {
        return await this.messageQueue.enqueue(data, options);
      } else {
        throw new Error(`Cannot send message in state: ${this.stateMachine.state}`);
      }
    }

    try {
      // 序列化数据
      const serializedData = this.serializeData(data);
      // 发送消息
      this.ws!.send(serializedData);
      // 如果需要确认,等待ACK
      if (options.requiresAck) {
        await this.waitForAck(data.id);
      }
    } catch (error) {
      // 发送失败,降级处理
      await this.handleSendFailure(data, error, options);
    }
  }

  // 关闭连接
  async close(code?: number, reason?: string): Promise<void> {
    this.stateMachine.transition('DISCONNECTING', 'user_initiated');
    // 停止心跳
    this.heartbeatManager?.stop();
    // 处理队列中的剩余消息
    await this.flushMessageQueue();
    // 关闭WebSocket连接
    if (this.ws) {
      this.ws.close(code || 1000, reason);
      this.ws = null;
    }
    this.stateMachine.transition('DISCONNECTED', 'normal_closure');
  }

  // 处理发送失败
  private async handleSendFailure(
    data: any, 
    error: Error, 
    options: SendOptions
  ): Promise<void> {
    const errorClassification = this.classifyError(error);

    // 根据错误类型决定处理方式
    if (errorClassification.recoverable) {
      // 可恢复错误:加入队列等待重试
      await this.messageQueue.enqueue(data, {
        ...options,
        priority: options.priority || 'high' // 发送失败的消息提高优先级
      });
    } else {
      // 不可恢复错误:通知应用层
      this.emit('send_error', { data, error, options });
    }

    // 考虑是否需要降级
    const metrics = await this.collectPerformanceMetrics();
    const degradationDecision = await this.degradationManager.evaluateDegradationNeeded(metrics);
    if (degradationDecision.shouldDegrade) {
      await this.degradationManager.executeDegradation(degradationDecision.targetLevel);
    }
  }

  // 设置WebSocket事件处理器
  private setupWebSocketEventHandlers(): void {
    if (!this.ws) return;

    this.ws.onopen = (event) => {
      this.handleOpen(event);
    };

    this.ws.onmessage = (event) => {
      this.handleMessage(event);
    };

    this.ws.onclose = (event) => {
      this.handleClose(event);
    };

    this.ws.onerror = (event) => {
      this.handleError(event);
    };
  }

  // 处理连接打开
  private handleOpen(event: Event): void {
    this.emit('connected', event);
    // 处理队列中积压的消息
    this.messageQueue.processQueue().catch(error => {
      console.error('Error processing message queue:', error);
    });
  }

  // 处理收到消息
  private handleMessage(event: MessageEvent): void {
    try {
      const data = this.parseMessage(event.data);

      // 处理心跳响应
      if (data.type === 'heartbeat_ack') {
        this.heartbeatManager?.recordResponseTime(Date.now() - data.timestamp);
        return;
      }

      // 处理消息确认
      if (data.type === 'ack') {
        this.messageQueue.acknowledgeMessage(data.messageId);
        return;
      }

      // 普通消息传递给应用层
      this.emit('message', data);
    } catch (error) {
      this.emit('message_error', { event, error });
    }
  }

  // 处理连接关闭
  private handleClose(event: CloseEvent): void {
    this.heartbeatManager?.stop();

    const errorClassification = this.classifyError(event);
    this.emit('disconnected', {
      code: event.code,
      reason: event.reason,
      wasClean: event.wasClean,
      classification: errorClassification
    });

    // 根据错误类型决定是否重连
    this.handleReconnection(event, errorClassification);
  }

  // 处理错误
  private handleError(event: Event): void {
    const errorClassification = this.classifyError(event);
    this.emit('error', {
      event,
      classification: errorClassification
    });
    this.stateMachine.transition('ERROR', `websocket_error: ${errorClassification.category}`);
  }

  // 处理重连逻辑
  private async handleReconnection(
    closeEvent: CloseEvent, 
    classification: ErrorClassification
  ): Promise<void> {
    const decision = await this.reconnectEngine.shouldReconnect(classification);

    if (decision.shouldReconnect) {
      this.stateMachine.transition('RECONNECTING', 'automatic_reconnection');
      // 等待重连延迟
      if (decision.delay && decision.delay > 0) {
        await this.delay(decision.delay);
      }

      try {
        await this.connect();
        this.reconnectEngine.recordReconnectResult(true);
      } catch (error) {
        this.reconnectEngine.recordReconnectResult(false);
        throw error;
      }
    } else {
      this.stateMachine.transition('DISCONNECTED', `reconnection_not_attempted: ${decision.reason}`);
      // 如果不需要重连,考虑降级
      const metrics = await this.collectPerformanceMetrics();
      const degradationDecision = await this.degradationManager.evaluateDegradationNeeded(metrics);
      if (degradationDecision.shouldDegrade) {
        await this.degradationManager.executeDegradation(degradationDecision.targetLevel);
      }
    }
  }

  // 事件发射器
  on(event: string, listener: Function): void {
    if (!this.eventListeners.has(event)) {
      this.eventListeners.set(event, []);
    }
    this.eventListeners.get(event)!.push(listener);
  }

  emit(event: string, data?: any): void {
    const listeners = this.eventListeners.get(event) || [];
    listeners.forEach(listener => {
      try {
        listener(data);
      } catch (error) {
        console.error(`Error in event listener for ${event}:`, error);
      }
    });
  }
}

六、实战场景与最佳实践

6.1 金融交易场景处理

在金融等高要求场景中,消息的顺序性、可靠性和低延迟至关重要。需要扩展基础客户端以满足特定领域需求。

// 金融级WebSocket客户端
class FinancialWebSocketClient extends RobustWebSocketClient {
  private orderQueue: PriorityMessageQueue;
  private marketDataHandler: MarketDataHandler;
  private tradeReconciler: TradeReconciler;

  constructor(config: FinancialWebSocketConfig) {
    super(config);
    this.orderQueue = new PriorityMessageQueue({
      maxRetries: 3,
      retryDelays: [100, 500, 1000],
      priorityLevels: ['CRITICAL', 'HIGH', 'NORMAL']
    });
    this.marketDataHandler = new MarketDataHandler();
    this.tradeReconciler = new TradeReconciler();
    this.setupFinancialEventHandlers();
  }

  // 发送交易订单
  async sendOrder(order: TradeOrder): Promise<OrderResponse> {
    // 生成唯一订单ID
    const orderId = this.generateOrderId();
    const orderWithId = { ...order, id: orderId, timestamp: Date.now() };

    try {
      // 高优先级发送
      await this.send(orderWithId, {
        priority: 'CRITICAL',
        requiresAck: true,
        queueIfOffline: true,
        timeout: 5000 // 5秒超时
      });

      // 等待订单确认
      const ack = await this.waitForOrderAck(orderId);

      return {
        orderId,
        status: 'SENT',
        timestamp: Date.now(),
        ackTimestamp: ack.timestamp
      };
    } catch (error) {
      // 订单发送失败,执行恢复操作
      return await this.handleOrderFailure(orderWithId, error);
    }
  }

  // 处理订单失败
  private async handleOrderFailure(order: TradeOrder, error: Error): Promise<OrderResponse> {
    const errorType = this.classifyFinancialError(error);

    switch (errorType) {
      case 'NETWORK_ERROR':
        // 网络错误:加入重试队列
        await this.orderQueue.enqueue(order, 'CRITICAL');
        return {
          orderId: order.id,
          status: 'QUEUED',
          timestamp: Date.now(),
          error: 'Network issue, order queued for retry'
        };

      case 'TIMEOUT_ERROR':
        // 超时错误:需要确认订单状态
        const status = await this.queryOrderStatus(order.id);
        return {
          orderId: order.id,
          status: status || 'UNKNOWN',
          timestamp: Date.now(),
          warning: 'Order may have been processed'
        };

      case 'PROTOCOL_ERROR':
        // 协议错误:不可恢复
        return {
          orderId: order.id,
          status: 'REJECTED',
          timestamp: Date.now(),
          error: 'Protocol error, order rejected'
        };

      default:
        // 其他错误:需要人工干预
        return {
          orderId: order.id,
          status: 'ERROR',
          timestamp: Date.now(),
          error: `Order failed: ${error.message}`
        };
    }
  }

  // 设置金融事件处理器
  private setupFinancialEventHandlers(): void {
    this.on('message', (data) => {
      if (data.type === 'market_data') {
        this.marketDataHandler.handleMarketData(data);
      } else if (data.type === 'order_update') {
        this.handleOrderUpdate(data);
      } else if (data.type === 'trade_confirmation') {
        this.tradeReconciler.reconcileTrade(data);
      }
    });

    this.on('disconnected', (event) => {
      // 连接断开时切换到备用数据源
      this.marketDataHandler.switchToBackupSource();
      // 通知用户连接状态
      this.showConnectionBanner('Connection lost, using backup data');
    });

    this.on('reconnected', () => {
      // 重新连接后恢复功能
      this.marketDataHandler.switchToPrimarySource();
      this.hideConnectionBanner();
      // 重新同步订单状态
      this.syncOrderStatus();
    });
  }
}

6.2 监控与告警集成

生产系统需要可观测性。将关键指标接入现有监控体系(如APM、日志系统),并设置合理的告警阈值。

// WebSocket监控集成
class WebSocketMonitoringIntegration {
  private metricsCollector: WebSocketMetricsCollector;
  private alertManager: AlertManager;
  private performanceMonitor: PerformanceMonitor;

  constructor(private client: RobustWebSocketClient) {
    this.metricsCollector = new WebSocketMetricsCollector(client);
    this.alertManager = new AlertManager();
    this.performanceMonitor = new PerformanceMonitor();
    this.setupMonitoring();
  }

  // 设置监控
  private setupMonitoring(): void {
    // 收集连接指标
    this.metricsCollector.on('metrics', (metrics) => {
      this.reportMetrics(metrics);
      this.checkAlerts(metrics);
    });

    // 性能监控
    this.performanceMonitor.startTracking();

    // 错误监控
    this.client.on('error', (error) => {
      this.reportError(error);
    });
  }

  // 报告指标
  private reportMetrics(metrics: WebSocketMetrics): void {
    // 报告到监控系统
    const telemetryData = {
      timestamp: new Date().toISOString(),
      connectionState: metrics.connectionState,
      messageCount: metrics.messageCount,
      errorCount: metrics.errorCount,
      avgLatency: metrics.avgLatency,
      reconnectCount: metrics.reconnectCount,
      uptime: metrics.uptime
    };

    // 发送到APM系统
    this.sendToAPM(telemetryData);
    // 发送到日志系统
    console.log('WebSocket Metrics:', telemetryData);
  }

  // 检查警报
  private checkAlerts(metrics: WebSocketMetrics): void {
    // 高错误率警报
    if (metrics.errorRate > 0.1) {
      this.alertManager.triggerAlert({
        type: 'HIGH_ERROR_RATE',
        severity: 'WARNING',
        message: `WebSocket error rate is ${(metrics.errorRate * 100).toFixed(1)}%`,
        details: metrics
      });
    }

    // 频繁重连警报
    if (metrics.reconnectCount > 5) {
      this.alertManager.triggerAlert({
        type: 'FREQUENT_RECONNECT',
        severity: 'ERROR', 
        message: `WebSocket reconnected ${metrics.reconnectCount} times recently`,
        details: metrics
      });
    }

    // 高延迟警报
    if (metrics.avgLatency > 1000) {
      this.alertManager.triggerAlert({
        type: 'HIGH_LATENCY',
        severity: 'WARNING',
        message: `WebSocket average latency is ${metrics.avgLatency}ms`,
        details: metrics
      });
    }
  }

  // 生成监控报告
  async generateMonitoringReport(): Promise<MonitoringReport> {
    const metrics = await this.metricsCollector.getAggregatedMetrics();
    const alerts = this.alertManager.getRecentAlerts();
    const performance = this.performanceMonitor.getPerformanceMetrics();

    return {
      summary: {
        overallHealth: this.calculateHealthScore(metrics),
        uptime: metrics.uptime,
        reliability: this.calculateReliability(metrics)
      },
      metrics,
      alerts,
      performance,
      recommendations: this.generateRecommendations(metrics, alerts)
    };
  }
}

七、测试策略与质量保障

7.1 错误场景模拟测试

通过模拟各种网络故障和服务器异常,验证客户端恢复策略的有效性,这是保障线上稳定性的关键环节。

// WebSocket错误场景测试器
class WebSocketErrorScenarioTester {
  private mockServer: MockWebSocketServer;
  private client: RobustWebSocketClient;
  private testResults: TestResult[] = [];

  constructor() {
    this.mockServer = new MockWebSocketServer();
    this.client = new RobustWebSocketClient({
      url: 'ws://localhost:8080',
      reconnect: {
        maxReconnectAttempts: 5,
        baseDelay: 1000
      }
    });
  }

  // 运行所有错误场景测试
  async runAllErrorScenarios(): Promise<TestReport> {
    const scenarios = [
      'NETWORK_DISCONNECT',
      'SERVER_RESTART', 
      'HIGH_LATENCY',
      'PACKET_LOSS',
      'PROTOCOL_ERROR',
      'AUTH_FAILURE',
      'RATE_LIMITING',
      'MESSAGE_CORRUPTION'
    ];

    for (const scenario of scenarios) {
      const result = await this.runScenario(scenario);
      this.testResults.push(result);
    }

    return this.generateTestReport();
  }

  // 运行单个场景测试
  private async runScenario(scenario: string): Promise<TestResult> {
    console.log(`Running scenario: ${scenario}`);

    // 配置模拟服务器行为
    this.configureMockServer(scenario);
    // 启动客户端
    await this.client.connect();

    // 执行场景特定的测试逻辑
    const startTime = Date.now();
    const result = await this.executeScenarioLogic(scenario);
    const duration = Date.now() - startTime;

    // 收集指标
    const metrics = await this.collectScenarioMetrics();

    return {
      scenario,
      passed: result.success,
      duration,
      metrics,
      errors: result.errors,
      recoveryTime: result.recoveryTime
    };
  }

  // 网络断开测试
  private async testNetworkDisconnect(): Promise<ScenarioResult> {
    // 建立连接
    await this.client.connect();
    // 发送测试消息确认连接正常
    await this.client.send({ type: 'test', id: 'pre_disconnect' });

    // 模拟网络断开
    this.mockServer.simulateNetworkDisconnect();
    // 验证客户端检测到断开并开始重连
    const disconnectDetected = await this.waitForEvent('disconnected', 5000);

    // 恢复网络
    this.mockServer.restoreNetwork();
    // 验证自动重连成功
    const reconnectSuccess = await this.waitForEvent('connected', 15000);

    // 验证消息队列功能
    await this.client.send({ type: 'test', id: 'during_disconnect' }, {
      queueIfOffline: true
    });
    const messageDelivered = await this.verifyMessageDelivery('during_disconnect');

    return {
      success: disconnectDetected && reconnectSuccess && messageDelivered,
      recoveryTime: this.calculateRecoveryTime(),
      errors: []
    };
  }

  // 高延迟测试
  private async testHighLatency(): Promise<ScenarioResult> {
    // 设置高延迟模拟
    this.mockServer.setLatency(2000); // 2秒延迟
    this.mockServer.setJitter(500);   // 500ms抖动

    await this.client.connect();

    // 测试消息传输
    const latencies: number[] = [];
    for (let i = 0; i < 10; i++) {
      const start = Date.now();
      await this.client.send({ type: 'latency_test', id: i });
      latencies.push(Date.now() - start);
    }

    const avgLatency = latencies.reduce((a, b) => a + b) / latencies.length;
    const acceptable = avgLatency < 3000; // 平均延迟小于3秒

    // 验证客户端是否适应高延迟(比如调整心跳间隔)
    const heartbeatStatus = this.client.getHeartbeatStatus();
    const adapted = heartbeatStatus.interval > 30000; // 心跳间隔延长

    return {
      success: acceptable && adapted,
      recoveryTime: 0,
      errors: acceptable ? [] : [`High latency: ${avgLatency}ms`]
    };
  }
}

结语:构建生产级的WebSocket错误处理

WebSocket错误处理的艺术在于在复杂性可靠性之间找到平衡。通过本文的模式库,你可以:

  1. 预见性处理 - 通过状态机和错误分类提前识别问题
  2. 智能恢复 - 基于网络质量和历史数据的自适应重连
  3. 优雅降级 - 多级备用方案确保核心功能可用
  4. 全面监控 - 实时健康评估和预警机制

真正的稳健通信不是避免错误,而是在错误发生时系统能够自我修复、自我适应、自我优化。这将WebSocket从简单的通信协议升级为可靠的基础设施组件。

关键成功指标:

  • 连接成功率 > 99.9%
  • 自动恢复时间 < 30秒
  • 消息投递保证 > 99.99%
  • 用户体验无感知中断

记住:优秀的错误处理是用户看不见的,但能让他们始终感受到产品的稳定性与可靠性。




上一篇:AI Agent控制智能家居PoC实践:基于Dify与AgentGateway实现集成与链路追踪
下一篇:AI规模化运营公众号矩阵:跨赛道通杀的搜索流量方法论与实战拆解
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 19:01 , Processed in 0.142334 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表