找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

478

积分

0

好友

66

主题
发表于 3 天前 | 查看: 6| 回复: 0

现代智能驾驶系统普遍基于Linux操作系统构建,其稳定性、高效性以及在关键时刻的可靠性至关重要。这与Linux的进程调度机制紧密相关,例如,紧急状况下需要立即触发进程A进行干预,但若调度时间被进程B占用,则可能导致严重后果。因此,一套关键且高效的调度算法是行车安全的基石。

考虑一个包含以下实时任务的自动驾驶系统:

  1. 激光雷达数据处理:周期50ms,处理时间5ms,最大允许延迟2ms。
  2. 摄像头图像识别:周期100ms,处理时间20ms,最大允许延迟10ms。
  3. 控制决策:周期20ms,处理时间2ms,最大允许延迟1ms。
  4. 紧急刹车:事件触发,处理时间1ms,必须在5ms内响应。

该系统运行在8核CPU上,包含4个高性能核心(P-core)和4个能效核心(E-core)。以下是完整的实时调度方案设计。

一、 任务分析与调度策略选择

1.1 任务特性分析

任务名称 类型 周期(T) 执行时间(C) 最大允许延迟 相对截止时间(D) 关键性
激光雷达数据处理 周期性 50ms 5ms 2ms 52ms
摄像头图像识别 周期性 100ms 20ms 10ms 110ms
控制决策 周期性 20ms 2ms 1ms 21ms 最高
紧急刹车 事件触发 N/A 1ms 5ms 5ms 关键

1.2 调度策略选择原则

  1. 控制决策:使用SCHED_DEADLINE,拥有最短周期和最紧迫的截止时间,赋予最高调度优先级。
  2. 紧急刹车:使用SCHED_FIFO,事件触发型任务需要立即获得CPU响应。
  3. 激光雷达处理:使用SCHED_DEADLINE,周期和截止时间中等。
  4. 摄像头识别:使用SCHED_DEADLINE,周期和截止时间最长。

二、 CPU核心分配方案

2.1 大小核架构分析

  • P-core (4个):高性能核心,适合运行实时任务。
  • E-core (4个):能效核心,适合运行后台非实时任务。

2.2 核心分配策略

# CPU核心分配映射
# CPU0 (P-core): 控制决策 (SCHED_DEADLINE)
# CPU1 (P-core): 激光雷达 (SCHED_DEADLINE)
# CPU2 (P-core): 摄像头识别 (SCHED_DEADLINE)
# CPU3 (P-core): 紧急刹车 (SCHED_FIFO) + 监控任务
# CPU4-7 (E-core): 非实时任务 (导航、日志、通信等)

三、 详细配置与参数调优

3.1 内核配置要求

# 必需的内核配置选项(/boot/config-*)
CONFIG_PREEMPT_RT=y                    # 实时内核补丁
CONFIG_HIGH_RES_TIMERS=y              # 高精度定时器
CONFIG_NO_HZ_FULL=y                   # 全无滴答模式
CONFIG_SCHED_DEADLINE=y               # 截止时间调度
CONFIG_CPU_ISOLATION=y                # CPU隔离支持
CONFIG_RCU_NOCB_CPU=y                 # RCU回调隔离
CONFIG_IRQ_FORCED_THREADING=y         # 中断线程化
CONFIG_PREEMPT_NOTIFIERS=y            # 抢占通知

3.2 启动参数配置

# /boot/grub/grub.cfg 或 /etc/default/grub 中添加
GRUB_CMDLINE_LINUX="
    # CPU性能设置
    intel_pstate=disable              # 禁用Intel P-state
    cpufreq.default_governor=performance  # 性能模式
    clocksource=tsc                   # TSC时钟源

    # 实时性优化
    isolcpus=0,1,2,3                  # 隔离P-core用于实时任务
    nohz_full=0,1,2,3                 # P-core全无滴答
    rcu_nocbs=0,1,2,3                 # RCU回调隔离
    rcu_nocb_poll                     # RCU轮询模式

    # 中断优化
    threadirqs                        # 线程化中断
    noirqbalance                      # 禁用IRQ平衡

    # 内存管理
    transparent_hugepage=never        # 禁用透明大页
    default_hugepagesz=1G             # 预分配大页
    hugepages=16                      # 16个1G大页

    # 网络优化
    skb_defer_max=0                   # 立即处理网络包
"

3.3 系统服务配置

为优化实时性,需要配置一个 systemd服务 在启动时应用各项设置。

# /etc/systemd/system/rt-optimization.service
[Unit]
Description=Real-time System Optimization
Before=multi-user.target

[Service]
Type=oneshot
RemainAfterExit=yes
# CPU频率锁定为最高性能
ExecStart=/bin/bash -c 'echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor'
ExecStart=/bin/bash -c 'echo 1 | tee /sys/devices/system/cpu/cpu*/cpufreq/energy_performance_preference'

# CPU隔离确认
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu0/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu0/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu1/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu1/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu2/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu2/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu3/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu3/online'

# 实时带宽设置
ExecStart=/bin/bash -c 'echo 1000000 > /proc/sys/kernel/sched_rt_period_us'
ExecStart=/bin/bash -c 'echo 950000 > /proc/sys/kernel/sched_rt_runtime_us'

# 内存锁定
ExecStart=/bin/bash -c 'echo 50 > /proc/sys/vm/swappiness'
ExecStart=/bin/bash -c 'echo 1 > /proc/sys/vm/overcommit_memory'

# 网络优化
ExecStart=/bin/bash -c 'echo 1 > /proc/sys/net/ipv4/tcp_low_latency'
ExecStart=/bin/bash -c 'echo 0 > /proc/sys/net/ipv4/tcp_slow_start_after_idle'

[Install]
WantedBy=multi-user.target

四、 实时任务配置脚本

4.1 任务启动脚本

#!/bin/bash
# /usr/local/bin/start_rt_tasks.sh

# 设置错误处理
set -e

# 创建cpuset组
mkdir -p /sys/fs/cgroup/cpuset/rt_pcores
echo "0-3" > /sys/fs/cgroup/cpuset/rt_pcores/cpuset.cpus
echo "0" > /sys/fs/cgroup/cpuset/rt_pcores/cpuset.mems

# 将当前shell移到cpuset
echo $$ > /sys/fs/cgroup/cpuset/rt_pcores/cgroup.procs

# 1. 紧急刹车任务 (最高优先级,CPU3专用)
# 绑定到CPU3,设置SCHED_FIFO优先级99
taskset -c 3 chrt -f 99 /opt/autodrive/emergency_brake \
  --response-time 5ms \
  --wcet 1ms &
BRAKE_PID=$!
echo $BRAKE_PID > /var/run/autodrive/brake.pid
# 锁定刹车任务内存
echo $BRAKE_PID > /sys/fs/cgroup/cpuset/rt_pcores/tasks
grep -q "mlockall" /proc/$BRAKE_PID/status || {
  nsenter -t $BRAKE_PID -m bash -c 'libmlockall.so lockall'
}

# 2. 控制决策任务 (CPU0,最短周期)
taskset -c 0 chrt -d \
  --sched-runtime 2000000 \     # 2ms runtime
  --sched-deadline 21000000 \   # 21ms deadline
  --sched-period 20000000 \     # 20ms period
  0 /opt/autodrive/control_decision \
  --cycle 20ms \
  --deadline 21ms &
CONTROL_PID=$!
echo $CONTROL_PID > /var/run/autodrive/control.pid

# 3. 激光雷达处理任务 (CPU1)
taskset -c 1 chrt -d \
  --sched-runtime 5000000 \     # 5ms runtime
  --sched-deadline 52000000 \   # 52ms deadline
  --sched-period 50000000 \     # 50ms period
  0 /opt/autodrive/lidar_processor \
  --cycle 50ms \
  --deadline 52ms &
LIDAR_PID=$!
echo $LIDAR_PID > /var/run/autodrive/lidar.pid

# 4. 摄像头识别任务 (CPU2)
taskset -c 2 chrt -d \
  --sched-runtime 20000000 \    # 20ms runtime
  --sched-deadline 110000000 \  # 110ms deadline
  --sched-period 100000000 \    # 100ms period
  0 /opt/autodrive/camera_vision \
  --cycle 100ms \
  --deadline 110ms &
CAMERA_PID=$!
echo $CAMERA_PID > /var/run/autodrive/camera.pid

# 设置中断亲和性
configure_interrupt_affinity() {
  # 获取PCI设备中断号
  LIDAR_IRQ=$(get_irq_for_device "0000:01:00.0")
  CAMERA_IRQ=$(get_irq_for_device "0000:02:00.0")
  BRAKE_IRQ=$(get_irq_for_device "0000:03:00.0")

  # 设置中断亲和性
  echo 2 > /proc/irq/$LIDAR_IRQ/smp_affinity    # CPU1 (二进制0010)
  echo 4 > /proc/irq/$CAMERA_IRQ/smp_affinity   # CPU2 (二进制0100)
  echo 8 > /proc/irq/$BRAKE_IRQ/smp_affinity    # CPU3 (二进制1000)
  # 控制决策任务通常由定时器触发,无需外部中断
}

# 配置网络优先级(如果使用车载网络)
configure_network_qos() {
  # 设置控制决策消息最高优先级
  tc qdisc add dev eth0 root handle 1: prio bands 3
  tc filter add dev eth0 protocol ip parent 1:0 prio 0 \
    u32 match ip dport 5000 0xffff flowid 1:1
  tc qdisc add dev eth0 parent 1:1 handle 10: pfifo limit 1000
}

echo "Real-time tasks started successfully"

4.2 中断亲和性辅助函数

#!/bin/bash
# /usr/local/bin/get_irq_for_device

DEVICE=$1
if [ -z "$DEVICE" ]; then
  echo "Usage: $0 <PCI device ID>"
  exit 1
fi

# 从/proc/interrupts中查找设备中断
IRQ=$(grep -l "$DEVICE" /proc/irq/*/spurious 2>/dev/null | \
      cut -d/ -f4 | head -1)

if [ -z "$IRQ" ]; then
  # 尝试从lspci和/proc/interrupts匹配
  VENDOR_DEVICE=$(lspci -s "$DEVICE" -n | awk '{print $3}')
  IRQ=$(grep "$VENDOR_DEVICE" /proc/interrupts | awk '{print $1}' | cut -d: -f1)
fi

echo $IRQ

五、 实时性验证方案

5.1 离线可调度性分析

#!/usr/bin/env python3
# /opt/autodrive/schedulability_test.py

import math

class RTTask:
    def __init__(self, name, C, D, T, core=None):
        self.name = name
        self.C = C  # 最坏执行时间 (ms)
        self.D = D  # 相对截止时间 (ms)
        self.T = T  # 周期 (ms)
        self.core = core
        self.U = C / T  # 利用率

    def __str__(self):
        return f"{self.name}: C={self.C}ms, D={self.D}ms, T={self.T}ms, U={self.U:.3f}"

def rate_monotonic_analysis(tasks):
    """速率单调分析(固定优先级)"""
    tasks_sorted = sorted(tasks, key=lambda x: x.T)  # 按周期排序
    print("\n速率单调分析 (Rate Monotonic):")

    for i, task in enumerate(tasks_sorted):
        # 计算响应时间
        R = task.C
        while True:
            R_next = task.C
            for hp_task in tasks_sorted[:i]:  # 更高优先级任务
                R_next += math.ceil(R / hp_task.T) * hp_task.C

            if R_next > task.D:
                print(f"  {task.name}: 不可调度 (R={R_next:.2f}ms > D={task.D}ms)")
                return False
            if R_next == R:
                print(f"  {task.name}: 可调度 (R={R:.2f}ms)")
                break
            if R_next > 100 * task.T:  # 防止无限循环
                print(f"  {task.name}: 分析超时")
                return False
            R = R_next

    return True

def edf_analysis(tasks):
    """最早截止时间优先分析"""
    total_utilization = sum(task.U for task in tasks)
    print(f"\nEDF总利用率: {total_utilization:.3f}")

    if total_utilization <= 1.0:
        print("  EDF可调度 (U_total ≤ 1.0)")
        return True
    else:
        print(f"  EDF可能不可调度 (U_total = {total_utilization:.3f} > 1.0)")
        return False

def deadline_monotonic_analysis(tasks):
    """截止时间单调分析"""
    tasks_sorted = sorted(tasks, key=lambda x: x.D)  # 按截止时间排序
    print("\n截止时间单调分析 (Deadline Monotonic):")

    for i, task in enumerate(tasks_sorted):
        R = task.C
        while True:
            R_next = task.C
            for hp_task in tasks_sorted[:i]:  # 更高优先级任务
                R_next += math.ceil(R / hp_task.T) * hp_task.C

            if R_next > task.D:
                print(f"  {task.name}: 不可调度 (R={R_next:.2f}ms > D={task.D}ms)")
                return False
            if R_next == R:
                print(f"  {task.name}: 可调度 (R={R:.2f}ms)")
                break
            R = R_next

    return True

# 定义任务集
tasks_per_core = {
    "CPU0": [
        RTTask("Control", 2, 21, 20),
    ],
    "CPU1": [
        RTTask("LiDAR", 5, 52, 50),
    ],
    "CPU2": [
        RTTask("Camera", 20, 110, 100),
    ],
    "CPU3": [
        RTTask("Emergency", 1, 5, 100),  # 假设最小间隔100ms
    ]
}

# 执行可调度性分析
print("自动驾驶系统可调度性分析")
print("=" * 50)

all_schedulable = True
for core, tasks in tasks_per_core.items():
    print(f"\n{core} 核心任务:")
    for task in tasks:
        print(f"  {task}")

    if core == "CPU3":  # SCHED_FIFO,使用截止时间单调分析
        schedulable = deadline_monotonic_analysis(tasks)
    else:  # SCHED_DEADLINE,使用EDF分析
        schedulable = edf_analysis(tasks)

    if not schedulable:
        all_schedulable = False

print("\n" + "=" * 50)
if all_schedulable:
    print("✅ 所有任务理论上可调度")
else:
    print("❌ 部分任务可能不可调度,需要调整")

5.2 在线实时性监控

#!/bin/bash
# /usr/local/bin/rt_monitor.sh

LOGFILE="/var/log/autodrive/rt_monitor.log"
ALERTFILE="/var/log/autodrive/rt_alerts.log"

# 监控函数
monitor_rt_tasks() {
    TIMESTAMP=$(date +%Y%m%d-%H%M%S)

    echo "=== Real-time Monitor Report - $TIMESTAMP ===" >> $LOGFILE

    # 1. 检查任务是否存在
    TASK_PIDS=()
    for task in control lidar camera brake; do
        PID_FILE="/var/run/autodrive/$task.pid"
        if [ -f "$PID_FILE" ]; then
            PID=$(cat $PID_FILE)
            if ps -p $PID > /dev/null 2>&1; then
                TASK_PIDS[$task]=$PID
                echo "$task: PID $PID is running" >> $LOGFILE
            else
                echo "ALERT: $task is not running!" | tee -a $ALERTFILE >> $LOGFILE
            fi
        else
            echo "ALERT: PID file for $task not found!" | tee -a $ALERTFILE >> $LOGFILE
        fi
    done

    # 2. 检查调度策略
    echo -e "\nScheduling Policy Check:" >> $LOGFILE
    for task in "${!TASK_PIDS[@]}"; do
        PID=${TASK_PIDS[$task]}
        POLICY=$(chrt -p $PID 2>/dev/null | grep policy | awk '{print $NF}')
        PRIORITY=$(chrt -p $PID 2>/dev/null | grep priority | awk '{print $NF}')
        echo "$task: policy=$POLICY, priority=$PRIORITY" >> $LOGFILE
    done

    # 3. 测量调度延迟(使用tracepoints)
    if [ -d /sys/kernel/debug/tracing ]; then
        echo -e "\nScheduling Latency:" >> $LOGFILE
        # 启用调度跟踪
        echo 0 > /sys/kernel/debug/tracing/tracing_on
        echo > /sys/kernel/debug/tracing/trace
        echo 1 > /sys/kernel/debug/tracing/events/sched/sched_switch/enable
        echo 1 > /sys/kernel/debug/tracing/events/sched/sched_wakeup/enable

        # 收集1秒的数据
        sleep 1

        echo 0 > /sys/kernel/debug/tracing/events/sched/sched_switch/enable
        echo 0 > /sys/kernel/debug/tracing/events/sched/sched_wakeup/enable

        # 分析延迟
        for task in "${!TASK_PIDS[@]}"; do
            PID=${TASK_PIDS[$task]}
            MAX_LATENCY=$(cat /sys/kernel/debug/tracing/trace | \
                grep "pid=$PID" | \
                grep sched_wakeup | \
                awk -F'latency=' '{print $2}' | \
                awk '{print $1}' | \
                sort -nr | head -1)

            if [ -n "$MAX_LATENCY" ]; then
                echo "$task: max wakeup latency = ${MAX_LATENCY}ns" >> $LOGFILE
                if [ $MAX_LATENCY -gt 1000000 ]; then # 超过1ms
                    echo "ALERT: $task wakeup latency ${MAX_LATENCY}ns > 1ms" | \
                        tee -a $ALERTFILE >> $LOGFILE
                fi
            fi
        done
    fi

    # 4. CPU利用率检查
    echo -e "\nCPU Utilization (last 10s):" >> $LOGFILE
    mpstat -P 0-3 1 10 | grep Average >> $LOGFILE

    # 5. 内存锁定检查
    echo -e "\nMemory Lock Status:" >> $LOGFILE
    for task in "${!TASK_PIDS[@]}"; do
        PID=${TASK_PIDS[$task]}
        if grep -q "VmLck" /proc/$PID/status 2>/dev/null; then
            LOCKED=$(grep VmLck /proc/$PID/status | awk '{print $2}')
            echo "$task: locked memory = $LOCKED kB" >> $LOGFILE
        fi
    done

    # 6. 中断统计
    echo -e "\nInterrupt Statistics:" >> $LOGFILE
    grep -E "(CPU0|CPU1|CPU2|CPU3)" /proc/interrupts | head -20 >> $LOGFILE

    echo -e "\n" >> $LOGFILE
}

# 主监控循环
while true; do
    monitor_rt_tasks
    sleep 10  # 每10秒监控一次

    # 如果存在警报,发送通知
    if [ -s $ALERTFILE ]; then
        # 这里可以添加邮件或网络通知
        # send_alert "$(tail -n 10 $ALERTFILE)"
        echo "Alerts detected, check $ALERTFILE"
        # 清空警报文件,避免重复通知
        > $ALERTFILE
    fi
done

5.3 压力测试与最坏情况验证

#!/bin/bash
# /opt/autodrive/stress_test.sh

echo "Starting stress test for autonomous driving system"
echo "This will run for 5 minutes under maximum load"

# 1. 启动压力工具
# CPU压力 (所有E-core)
stress-ng --cpu 4 --cpu-method all --timeout 300 &
# 内存压力
stress-ng --vm 2 --vm-bytes 2G --timeout 300 &
# I/O压力
stress-ng --io 4 --timeout 300 &

# 2. 在P-core上运行实时性测试
echo "Testing real-time performance under stress..."
# 在每个P-core上运行cyclictest
for cpu in 0 1 2 3; do
    taskset -c $cpu cyclictest \
        -t1 -p 80 -n -i 1000 -l 60000 -q -m \
        --histogram=1000 \
        --duration=300 \
        --quiet \
        > /var/log/autodrive/stress_cpu${cpu}.log 2>&1 &
done

# 3. 监控实时任务性能
echo "Monitoring real-time task performance..."
# 使用perf记录调度事件
perf record \
    -e sched:sched_switch \
    -e sched:sched_wakeup \
    -e sched:sched_wakeup_new \
    -e sched:sched_stat_runtime \
    -a -g -o /var/log/autodrive/perf_stress.data \
    sleep 300 &

# 4. 等待测试完成
echo "Stress test running for 300 seconds..."
sleep 300

# 5. 分析结果
echo "Analyzing results..."
for cpu in 0 1 2 3; do
    echo -e "\nCPU$cpu Results:"
    MAX_LAT=$(grep "Max Latencies" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $3}')
    AVG_LAT=$(grep "Avg Latencies" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $3}')
    HIST_1MS=$(grep "001000" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $2}')

    echo "  Max Latency: ${MAX_LAT}ns"
    echo "  Avg Latency: ${AVG_LAT}ns"
    echo "  >1ms samples: ${HIST_1MS}"

    # 检查是否满足要求
    if [ ${MAX_LAT:-0} -gt 2000000 ]; then # 2ms
        echo "  ❌ FAIL: Max latency > 2ms"
    else
        echo "  ✅ PASS: Max latency within limits"
    fi
done

# 生成性能报告
perf report -i /var/log/autodrive/perf_stress.data \
    --stdio --sort comm,dso > /var/log/autodrive/perf_report.txt

echo "Stress test completed. Reports saved to /var/log/autodrive/"

六、 系统维护与故障处理

6.1 自动恢复脚本

#!/bin/bash
# /usr/local/bin/rt_recovery.sh

# 检测并恢复失败的实时任务
check_and_restart_task() {
    local task_name=$1
    local pid_file="/var/run/autodrive/$task_name.pid"
    local restart_cmd=""

    case $task_name in
        "brake")
            restart_cmd="taskset -c 3 chrt -f 99 /opt/autodrive/emergency_brake"
            ;;
        "control")
            restart_cmd="taskset -c 0 chrt -d --sched-runtime 2000000 --sched-deadline 21000000 --sched-period 20000000 0 /opt/autodrive/control_decision"
            ;;
        "lidar")
            restart_cmd="taskset -c 1 chrt -d --sched-runtime 5000000 --sched-deadline 52000000 --sched-period 50000000 0 /opt/autodrive/lidar_processor"
            ;;
        "camera")
            restart_cmd="taskset -c 2 chrt -d --sched-runtime 20000000 --sched-deadline 110000000 --sched-period 100000000 0 /opt/autodrive/camera_vision"
            ;;
    esac

    if [ -f "$pid_file" ]; then
        local pid=$(cat "$pid_file")
        if ! kill -0 "$pid" 2>/dev/null; then
            echo "$(date): $task_name is not running, restarting..." >> /var/log/autodrive/recovery.log
            # 清理残留资源
            if [ -d "/proc/$pid" ]; then
                kill -9 "$pid" 2>/dev/null
            fi

            # 重新启动
            eval "$restart_cmd &"
            local new_pid=$!
            echo $new_pid > "$pid_file"
            echo "$(date): $task_name restarted with PID $new_pid" >> /var/log/autodrive/recovery.log

            # 发送警报
            send_alert "Real-time task $task_name restarted"
        fi
    else
        echo "$(date): PID file for $task_name not found" >> /var/log/autodrive/recovery.log
    fi
}

# 主恢复循环
while true; do
    for task in brake control lidar camera; do
        check_and_restart_task $task
    done
    sleep 5  # 每5秒检查一次
done

6.2 性能数据分析工具

#!/usr/bin/env python3
# /opt/autodrive/analyze_rt_performance.py

import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import json

class RTPerformanceAnalyzer:
    def __init__(self, log_dir="/var/log/autodrive"):
        self.log_dir = log_dir
        self.tasks = ['brake', 'control', 'lidar', 'camera']

    def parse_latency_logs(self):
        """解析延迟日志"""
        latency_data = {}

        for task in self.tasks:
            log_file = f"{self.log_dir}/{task}_latency.log"
            try:
                with open(log_file, 'r') as f:
                    lines = f.readlines()
                    latencies = []
                    for line in lines:
                        if "latency" in line.lower():
                            # 提取延迟值(假设格式:latency=123456ns)
                            parts = line.split()
                            for part in parts:
                                if "latency=" in part:
                                    latency = int(part.split('=')[1].replace('ns', ''))
                                    latencies.append(latency)

                    if latencies:
                        latency_data[task] = {
                            'min': min(latencies),
                            'max': max(latencies),
                            'avg': sum(latencies) / len(latencies),
                            'p99': np.percentile(latencies, 99),
                            'p999': np.percentile(latencies, 99.9),
                            'samples': latencies
                        }
            except FileNotFoundError:
                print(f"Warning: Log file not found for {task}")

        return latency_data

    def generate_report(self, data):
        """生成性能报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'tasks': {},
            'compliance': {},
            'recommendations': []
        }

        # 任务要求(单位:纳秒)
        requirements = {
            'brake': {'max': 5000000},      # 5ms
            'control': {'max': 21000000},   # 21ms
            'lidar': {'max': 52000000},     # 52ms
            'camera': {'max': 110000000}    # 110ms
        }

        for task, metrics in data.items():
            report['tasks'][task] = metrics

            if task in requirements:
                if metrics['max'] <= requirements[task]['max']:
                    report['compliance'][task] = 'PASS'
                else:
                    report['compliance'][task] = 'FAIL'
                    report['recommendations'].append(
                        f"{task}: 最大延迟 {metrics['max']/1e6:.2f}ms "
                        f"超过要求 {requirements[task]['max']/1e6:.2f}ms"
                    )

        # 保存报告
        report_file = f"{self.log_dir}/performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)

        return report

    def plot_latency_distribution(self, data):
        """绘制延迟分布图"""
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.flatten()

        for idx, task in enumerate(self.tasks):
            if task in data:
                samples = data[task]['samples']
                ax = axes[idx]

                # 直方图
                ax.hist(np.array(samples) / 1e6, bins=50, alpha=0.7)
                ax.set_xlabel('Latency (ms)')
                ax.set_ylabel('Frequency')
                ax.set_title(f'{task.capitalize()} Task Latency Distribution')

                # 添加统计信息
                stats_text = (
                    f"Min: {data[task]['min']/1e6:.2f}ms\n"
                    f"Max: {data[task]['max']/1e6:.2f}ms\n"
                    f"Avg: {data[task]['avg']/1e6:.2f}ms\n"
                    f"P99: {data[task]['p99']/1e6:.2f}ms"
                )
                ax.text(0.95, 0.95, stats_text,
                       transform=ax.transAxes,
                       verticalalignment='top',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

        plt.tight_layout()
        plot_file = f"{self.log_dir}/latency_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
        plt.savefig(plot_file, dpi=150)
        plt.close()

        return plot_file

if __name__ == "__main__":
    analyzer = RTPerformanceAnalyzer()

    print("Analyzing real-time performance data...")

    # 解析数据
    latency_data = analyzer.parse_latency_logs()

    if latency_data:
        # 生成报告
        report = analyzer.generate_report(latency_data)

        print("\nPerformance Summary:")
        print("=" * 60)
        for task, compliance in report['compliance'].items():
            status = "✅" if compliance == 'PASS' else "❌"
            max_latency = report['tasks'][task]['max'] / 1e6
            print(f"{task:15} {status} {compliance:6} Max: {max_latency:6.2f}ms")

        print("\n" + "=" * 60)

        if report['recommendations']:
            print("\nRecommendations:")
            for rec in report['recommendations']:
                print(f"  • {rec}")

        # 生成图表
        plot_file = analyzer.plot_latency_distribution(latency_data)
        print(f"\nLatency distribution plot saved to: {plot_file}")
    else:
        print("No latency data found for analysis")

七、 总结与应急预案

7.1 系统健康检查清单

#!/bin/bash
# /usr/local/bin/health_check.sh

echo "Autonomous Driving System Health Check"
echo "======================================"

check_passed=0
check_failed=0

check_item() {
    local description=$1
    local command=$2
    local expected=$3

    if eval "$command" 2>/dev/null | grep -q "$expected"; then
        echo "✅ $description"
        ((check_passed++))
        return 0
    else
        echo "❌ $description"
        ((check_failed++))
        return 1
    fi
}

# 1. 内核版本检查
check_item "Kernel is PREEMPT_RT" "uname -v" "PREEMPT RT"

# 2. CPU隔离检查
check_item "CPU0-3 are isolated" "cat /sys/devices/system/cpu/isolated" "0-3"

# 3. 实时任务运行检查
for task in control lidar camera brake; do
    check_item "$task task is running" "ps aux | grep -v grep | grep $task" "$task"
done

# 4. 调度策略检查
check_item "Control task is SCHED_DEADLINE" "chrt -p \$(pidof control_decision)" "SCHED_DEADLINE"
check_item "Brake task is SCHED_FIFO" "chrt -p \$(pidof emergency_brake)" "SCHED_FIFO"

# 5. CPU频率检查
check_item "CPU0 in performance mode" "cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor" "performance"

# 6. 内存锁定检查
check_item "Brake task memory is locked" "grep VmLck /proc/\$(pidof emergency_brake)/status" "VmLck"

# 7. 中断亲和性检查
check_item "Brake IRQ on CPU3" "cat /proc/irq/\$(get_irq_for_device 0000:03:00.0)/smp_affinity 2>/dev/null" "8"

echo "======================================"
echo "Passed: $check_passed, Failed: $check_failed"

if [ $check_failed -eq 0 ]; then
    echo "✅ System health check PASSED"
    exit 0
else
    echo "❌ System health check FAILED"
    exit 1
fi

7.2 紧急情况预案

故障场景 检测方法 自动恢复动作 人工干预
实时任务崩溃 进程监控脚本 自动重启任务 检查崩溃原因
调度延迟超标 延迟监控 重启受影响任务 分析系统负载
CPU过热降频 温度监控 降低非关键任务负载 检查冷却系统
内存不足 OOM监控 终止低优先级进程 增加物理内存
网络中断 心跳检测 切换备份网络接口 检查网络设备

通过以上完整的方案设计,自动驾驶系统的实时任务可以得到有效调度和监控,确保在各种情况下都能满足严格的实时性要求。系统具有自愈能力,能够在出现问题时自动恢复,并通过详细的监控和日志为问题分析提供支持。




上一篇:2023年最新JAVA架构师教程 100集JAVA架构师最尖锐的技术剖析 Spring/微服务/JVM/Redis/MySQL/Netty源码与高并发实战
下一篇:阿里顶级架构师操刀 31天完成级高性能购票 MySQL设计、Netty源码、高并发架构与系统安全
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 17:09 , Processed in 0.186996 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表