现代智能驾驶系统普遍基于Linux操作系统构建,其稳定性、高效性以及在关键时刻的可靠性至关重要。这与Linux的进程调度机制紧密相关,例如,紧急状况下需要立即触发进程A进行干预,但若调度时间被进程B占用,则可能导致严重后果。因此,一套关键且高效的调度算法是行车安全的基石。
考虑一个包含以下实时任务的自动驾驶系统:
- 激光雷达数据处理:周期50ms,处理时间5ms,最大允许延迟2ms。
- 摄像头图像识别:周期100ms,处理时间20ms,最大允许延迟10ms。
- 控制决策:周期20ms,处理时间2ms,最大允许延迟1ms。
- 紧急刹车:事件触发,处理时间1ms,必须在5ms内响应。
该系统运行在8核CPU上,包含4个高性能核心(P-core)和4个能效核心(E-core)。以下是完整的实时调度方案设计。
一、 任务分析与调度策略选择
1.1 任务特性分析
| 任务名称 |
类型 |
周期(T) |
执行时间(C) |
最大允许延迟 |
相对截止时间(D) |
关键性 |
| 激光雷达数据处理 |
周期性 |
50ms |
5ms |
2ms |
52ms |
高 |
| 摄像头图像识别 |
周期性 |
100ms |
20ms |
10ms |
110ms |
中 |
| 控制决策 |
周期性 |
20ms |
2ms |
1ms |
21ms |
最高 |
| 紧急刹车 |
事件触发 |
N/A |
1ms |
5ms |
5ms |
关键 |
1.2 调度策略选择原则
- 控制决策:使用
SCHED_DEADLINE,拥有最短周期和最紧迫的截止时间,赋予最高调度优先级。
- 紧急刹车:使用
SCHED_FIFO,事件触发型任务需要立即获得CPU响应。
- 激光雷达处理:使用
SCHED_DEADLINE,周期和截止时间中等。
- 摄像头识别:使用
SCHED_DEADLINE,周期和截止时间最长。
二、 CPU核心分配方案
2.1 大小核架构分析
- P-core (4个):高性能核心,适合运行实时任务。
- E-core (4个):能效核心,适合运行后台非实时任务。
2.2 核心分配策略
# CPU核心分配映射
# CPU0 (P-core): 控制决策 (SCHED_DEADLINE)
# CPU1 (P-core): 激光雷达 (SCHED_DEADLINE)
# CPU2 (P-core): 摄像头识别 (SCHED_DEADLINE)
# CPU3 (P-core): 紧急刹车 (SCHED_FIFO) + 监控任务
# CPU4-7 (E-core): 非实时任务 (导航、日志、通信等)
三、 详细配置与参数调优
3.1 内核配置要求
# 必需的内核配置选项(/boot/config-*)
CONFIG_PREEMPT_RT=y # 实时内核补丁
CONFIG_HIGH_RES_TIMERS=y # 高精度定时器
CONFIG_NO_HZ_FULL=y # 全无滴答模式
CONFIG_SCHED_DEADLINE=y # 截止时间调度
CONFIG_CPU_ISOLATION=y # CPU隔离支持
CONFIG_RCU_NOCB_CPU=y # RCU回调隔离
CONFIG_IRQ_FORCED_THREADING=y # 中断线程化
CONFIG_PREEMPT_NOTIFIERS=y # 抢占通知
3.2 启动参数配置
# /boot/grub/grub.cfg 或 /etc/default/grub 中添加
GRUB_CMDLINE_LINUX="
# CPU性能设置
intel_pstate=disable # 禁用Intel P-state
cpufreq.default_governor=performance # 性能模式
clocksource=tsc # TSC时钟源
# 实时性优化
isolcpus=0,1,2,3 # 隔离P-core用于实时任务
nohz_full=0,1,2,3 # P-core全无滴答
rcu_nocbs=0,1,2,3 # RCU回调隔离
rcu_nocb_poll # RCU轮询模式
# 中断优化
threadirqs # 线程化中断
noirqbalance # 禁用IRQ平衡
# 内存管理
transparent_hugepage=never # 禁用透明大页
default_hugepagesz=1G # 预分配大页
hugepages=16 # 16个1G大页
# 网络优化
skb_defer_max=0 # 立即处理网络包
"
3.3 系统服务配置
为优化实时性,需要配置一个 systemd服务 在启动时应用各项设置。
# /etc/systemd/system/rt-optimization.service
[Unit]
Description=Real-time System Optimization
Before=multi-user.target
[Service]
Type=oneshot
RemainAfterExit=yes
# CPU频率锁定为最高性能
ExecStart=/bin/bash -c 'echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor'
ExecStart=/bin/bash -c 'echo 1 | tee /sys/devices/system/cpu/cpu*/cpufreq/energy_performance_preference'
# CPU隔离确认
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu0/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu0/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu1/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu1/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu2/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu2/online'
ExecStart=/bin/bash -c 'echo 0 > /sys/devices/system/cpu/cpu3/online'
ExecStart=/bin/bash -c 'echo 1 > /sys/devices/system/cpu/cpu3/online'
# 实时带宽设置
ExecStart=/bin/bash -c 'echo 1000000 > /proc/sys/kernel/sched_rt_period_us'
ExecStart=/bin/bash -c 'echo 950000 > /proc/sys/kernel/sched_rt_runtime_us'
# 内存锁定
ExecStart=/bin/bash -c 'echo 50 > /proc/sys/vm/swappiness'
ExecStart=/bin/bash -c 'echo 1 > /proc/sys/vm/overcommit_memory'
# 网络优化
ExecStart=/bin/bash -c 'echo 1 > /proc/sys/net/ipv4/tcp_low_latency'
ExecStart=/bin/bash -c 'echo 0 > /proc/sys/net/ipv4/tcp_slow_start_after_idle'
[Install]
WantedBy=multi-user.target
四、 实时任务配置脚本
4.1 任务启动脚本
#!/bin/bash
# /usr/local/bin/start_rt_tasks.sh
# 设置错误处理
set -e
# 创建cpuset组
mkdir -p /sys/fs/cgroup/cpuset/rt_pcores
echo "0-3" > /sys/fs/cgroup/cpuset/rt_pcores/cpuset.cpus
echo "0" > /sys/fs/cgroup/cpuset/rt_pcores/cpuset.mems
# 将当前shell移到cpuset
echo $$ > /sys/fs/cgroup/cpuset/rt_pcores/cgroup.procs
# 1. 紧急刹车任务 (最高优先级,CPU3专用)
# 绑定到CPU3,设置SCHED_FIFO优先级99
taskset -c 3 chrt -f 99 /opt/autodrive/emergency_brake \
--response-time 5ms \
--wcet 1ms &
BRAKE_PID=$!
echo $BRAKE_PID > /var/run/autodrive/brake.pid
# 锁定刹车任务内存
echo $BRAKE_PID > /sys/fs/cgroup/cpuset/rt_pcores/tasks
grep -q "mlockall" /proc/$BRAKE_PID/status || {
nsenter -t $BRAKE_PID -m bash -c 'libmlockall.so lockall'
}
# 2. 控制决策任务 (CPU0,最短周期)
taskset -c 0 chrt -d \
--sched-runtime 2000000 \ # 2ms runtime
--sched-deadline 21000000 \ # 21ms deadline
--sched-period 20000000 \ # 20ms period
0 /opt/autodrive/control_decision \
--cycle 20ms \
--deadline 21ms &
CONTROL_PID=$!
echo $CONTROL_PID > /var/run/autodrive/control.pid
# 3. 激光雷达处理任务 (CPU1)
taskset -c 1 chrt -d \
--sched-runtime 5000000 \ # 5ms runtime
--sched-deadline 52000000 \ # 52ms deadline
--sched-period 50000000 \ # 50ms period
0 /opt/autodrive/lidar_processor \
--cycle 50ms \
--deadline 52ms &
LIDAR_PID=$!
echo $LIDAR_PID > /var/run/autodrive/lidar.pid
# 4. 摄像头识别任务 (CPU2)
taskset -c 2 chrt -d \
--sched-runtime 20000000 \ # 20ms runtime
--sched-deadline 110000000 \ # 110ms deadline
--sched-period 100000000 \ # 100ms period
0 /opt/autodrive/camera_vision \
--cycle 100ms \
--deadline 110ms &
CAMERA_PID=$!
echo $CAMERA_PID > /var/run/autodrive/camera.pid
# 设置中断亲和性
configure_interrupt_affinity() {
# 获取PCI设备中断号
LIDAR_IRQ=$(get_irq_for_device "0000:01:00.0")
CAMERA_IRQ=$(get_irq_for_device "0000:02:00.0")
BRAKE_IRQ=$(get_irq_for_device "0000:03:00.0")
# 设置中断亲和性
echo 2 > /proc/irq/$LIDAR_IRQ/smp_affinity # CPU1 (二进制0010)
echo 4 > /proc/irq/$CAMERA_IRQ/smp_affinity # CPU2 (二进制0100)
echo 8 > /proc/irq/$BRAKE_IRQ/smp_affinity # CPU3 (二进制1000)
# 控制决策任务通常由定时器触发,无需外部中断
}
# 配置网络优先级(如果使用车载网络)
configure_network_qos() {
# 设置控制决策消息最高优先级
tc qdisc add dev eth0 root handle 1: prio bands 3
tc filter add dev eth0 protocol ip parent 1:0 prio 0 \
u32 match ip dport 5000 0xffff flowid 1:1
tc qdisc add dev eth0 parent 1:1 handle 10: pfifo limit 1000
}
echo "Real-time tasks started successfully"
4.2 中断亲和性辅助函数
#!/bin/bash
# /usr/local/bin/get_irq_for_device
DEVICE=$1
if [ -z "$DEVICE" ]; then
echo "Usage: $0 <PCI device ID>"
exit 1
fi
# 从/proc/interrupts中查找设备中断
IRQ=$(grep -l "$DEVICE" /proc/irq/*/spurious 2>/dev/null | \
cut -d/ -f4 | head -1)
if [ -z "$IRQ" ]; then
# 尝试从lspci和/proc/interrupts匹配
VENDOR_DEVICE=$(lspci -s "$DEVICE" -n | awk '{print $3}')
IRQ=$(grep "$VENDOR_DEVICE" /proc/interrupts | awk '{print $1}' | cut -d: -f1)
fi
echo $IRQ
五、 实时性验证方案
5.1 离线可调度性分析
#!/usr/bin/env python3
# /opt/autodrive/schedulability_test.py
import math
class RTTask:
def __init__(self, name, C, D, T, core=None):
self.name = name
self.C = C # 最坏执行时间 (ms)
self.D = D # 相对截止时间 (ms)
self.T = T # 周期 (ms)
self.core = core
self.U = C / T # 利用率
def __str__(self):
return f"{self.name}: C={self.C}ms, D={self.D}ms, T={self.T}ms, U={self.U:.3f}"
def rate_monotonic_analysis(tasks):
"""速率单调分析(固定优先级)"""
tasks_sorted = sorted(tasks, key=lambda x: x.T) # 按周期排序
print("\n速率单调分析 (Rate Monotonic):")
for i, task in enumerate(tasks_sorted):
# 计算响应时间
R = task.C
while True:
R_next = task.C
for hp_task in tasks_sorted[:i]: # 更高优先级任务
R_next += math.ceil(R / hp_task.T) * hp_task.C
if R_next > task.D:
print(f" {task.name}: 不可调度 (R={R_next:.2f}ms > D={task.D}ms)")
return False
if R_next == R:
print(f" {task.name}: 可调度 (R={R:.2f}ms)")
break
if R_next > 100 * task.T: # 防止无限循环
print(f" {task.name}: 分析超时")
return False
R = R_next
return True
def edf_analysis(tasks):
"""最早截止时间优先分析"""
total_utilization = sum(task.U for task in tasks)
print(f"\nEDF总利用率: {total_utilization:.3f}")
if total_utilization <= 1.0:
print(" EDF可调度 (U_total ≤ 1.0)")
return True
else:
print(f" EDF可能不可调度 (U_total = {total_utilization:.3f} > 1.0)")
return False
def deadline_monotonic_analysis(tasks):
"""截止时间单调分析"""
tasks_sorted = sorted(tasks, key=lambda x: x.D) # 按截止时间排序
print("\n截止时间单调分析 (Deadline Monotonic):")
for i, task in enumerate(tasks_sorted):
R = task.C
while True:
R_next = task.C
for hp_task in tasks_sorted[:i]: # 更高优先级任务
R_next += math.ceil(R / hp_task.T) * hp_task.C
if R_next > task.D:
print(f" {task.name}: 不可调度 (R={R_next:.2f}ms > D={task.D}ms)")
return False
if R_next == R:
print(f" {task.name}: 可调度 (R={R:.2f}ms)")
break
R = R_next
return True
# 定义任务集
tasks_per_core = {
"CPU0": [
RTTask("Control", 2, 21, 20),
],
"CPU1": [
RTTask("LiDAR", 5, 52, 50),
],
"CPU2": [
RTTask("Camera", 20, 110, 100),
],
"CPU3": [
RTTask("Emergency", 1, 5, 100), # 假设最小间隔100ms
]
}
# 执行可调度性分析
print("自动驾驶系统可调度性分析")
print("=" * 50)
all_schedulable = True
for core, tasks in tasks_per_core.items():
print(f"\n{core} 核心任务:")
for task in tasks:
print(f" {task}")
if core == "CPU3": # SCHED_FIFO,使用截止时间单调分析
schedulable = deadline_monotonic_analysis(tasks)
else: # SCHED_DEADLINE,使用EDF分析
schedulable = edf_analysis(tasks)
if not schedulable:
all_schedulable = False
print("\n" + "=" * 50)
if all_schedulable:
print("✅ 所有任务理论上可调度")
else:
print("❌ 部分任务可能不可调度,需要调整")
5.2 在线实时性监控
#!/bin/bash
# /usr/local/bin/rt_monitor.sh
LOGFILE="/var/log/autodrive/rt_monitor.log"
ALERTFILE="/var/log/autodrive/rt_alerts.log"
# 监控函数
monitor_rt_tasks() {
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
echo "=== Real-time Monitor Report - $TIMESTAMP ===" >> $LOGFILE
# 1. 检查任务是否存在
TASK_PIDS=()
for task in control lidar camera brake; do
PID_FILE="/var/run/autodrive/$task.pid"
if [ -f "$PID_FILE" ]; then
PID=$(cat $PID_FILE)
if ps -p $PID > /dev/null 2>&1; then
TASK_PIDS[$task]=$PID
echo "$task: PID $PID is running" >> $LOGFILE
else
echo "ALERT: $task is not running!" | tee -a $ALERTFILE >> $LOGFILE
fi
else
echo "ALERT: PID file for $task not found!" | tee -a $ALERTFILE >> $LOGFILE
fi
done
# 2. 检查调度策略
echo -e "\nScheduling Policy Check:" >> $LOGFILE
for task in "${!TASK_PIDS[@]}"; do
PID=${TASK_PIDS[$task]}
POLICY=$(chrt -p $PID 2>/dev/null | grep policy | awk '{print $NF}')
PRIORITY=$(chrt -p $PID 2>/dev/null | grep priority | awk '{print $NF}')
echo "$task: policy=$POLICY, priority=$PRIORITY" >> $LOGFILE
done
# 3. 测量调度延迟(使用tracepoints)
if [ -d /sys/kernel/debug/tracing ]; then
echo -e "\nScheduling Latency:" >> $LOGFILE
# 启用调度跟踪
echo 0 > /sys/kernel/debug/tracing/tracing_on
echo > /sys/kernel/debug/tracing/trace
echo 1 > /sys/kernel/debug/tracing/events/sched/sched_switch/enable
echo 1 > /sys/kernel/debug/tracing/events/sched/sched_wakeup/enable
# 收集1秒的数据
sleep 1
echo 0 > /sys/kernel/debug/tracing/events/sched/sched_switch/enable
echo 0 > /sys/kernel/debug/tracing/events/sched/sched_wakeup/enable
# 分析延迟
for task in "${!TASK_PIDS[@]}"; do
PID=${TASK_PIDS[$task]}
MAX_LATENCY=$(cat /sys/kernel/debug/tracing/trace | \
grep "pid=$PID" | \
grep sched_wakeup | \
awk -F'latency=' '{print $2}' | \
awk '{print $1}' | \
sort -nr | head -1)
if [ -n "$MAX_LATENCY" ]; then
echo "$task: max wakeup latency = ${MAX_LATENCY}ns" >> $LOGFILE
if [ $MAX_LATENCY -gt 1000000 ]; then # 超过1ms
echo "ALERT: $task wakeup latency ${MAX_LATENCY}ns > 1ms" | \
tee -a $ALERTFILE >> $LOGFILE
fi
fi
done
fi
# 4. CPU利用率检查
echo -e "\nCPU Utilization (last 10s):" >> $LOGFILE
mpstat -P 0-3 1 10 | grep Average >> $LOGFILE
# 5. 内存锁定检查
echo -e "\nMemory Lock Status:" >> $LOGFILE
for task in "${!TASK_PIDS[@]}"; do
PID=${TASK_PIDS[$task]}
if grep -q "VmLck" /proc/$PID/status 2>/dev/null; then
LOCKED=$(grep VmLck /proc/$PID/status | awk '{print $2}')
echo "$task: locked memory = $LOCKED kB" >> $LOGFILE
fi
done
# 6. 中断统计
echo -e "\nInterrupt Statistics:" >> $LOGFILE
grep -E "(CPU0|CPU1|CPU2|CPU3)" /proc/interrupts | head -20 >> $LOGFILE
echo -e "\n" >> $LOGFILE
}
# 主监控循环
while true; do
monitor_rt_tasks
sleep 10 # 每10秒监控一次
# 如果存在警报,发送通知
if [ -s $ALERTFILE ]; then
# 这里可以添加邮件或网络通知
# send_alert "$(tail -n 10 $ALERTFILE)"
echo "Alerts detected, check $ALERTFILE"
# 清空警报文件,避免重复通知
> $ALERTFILE
fi
done
5.3 压力测试与最坏情况验证
#!/bin/bash
# /opt/autodrive/stress_test.sh
echo "Starting stress test for autonomous driving system"
echo "This will run for 5 minutes under maximum load"
# 1. 启动压力工具
# CPU压力 (所有E-core)
stress-ng --cpu 4 --cpu-method all --timeout 300 &
# 内存压力
stress-ng --vm 2 --vm-bytes 2G --timeout 300 &
# I/O压力
stress-ng --io 4 --timeout 300 &
# 2. 在P-core上运行实时性测试
echo "Testing real-time performance under stress..."
# 在每个P-core上运行cyclictest
for cpu in 0 1 2 3; do
taskset -c $cpu cyclictest \
-t1 -p 80 -n -i 1000 -l 60000 -q -m \
--histogram=1000 \
--duration=300 \
--quiet \
> /var/log/autodrive/stress_cpu${cpu}.log 2>&1 &
done
# 3. 监控实时任务性能
echo "Monitoring real-time task performance..."
# 使用perf记录调度事件
perf record \
-e sched:sched_switch \
-e sched:sched_wakeup \
-e sched:sched_wakeup_new \
-e sched:sched_stat_runtime \
-a -g -o /var/log/autodrive/perf_stress.data \
sleep 300 &
# 4. 等待测试完成
echo "Stress test running for 300 seconds..."
sleep 300
# 5. 分析结果
echo "Analyzing results..."
for cpu in 0 1 2 3; do
echo -e "\nCPU$cpu Results:"
MAX_LAT=$(grep "Max Latencies" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $3}')
AVG_LAT=$(grep "Avg Latencies" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $3}')
HIST_1MS=$(grep "001000" /var/log/autodrive/stress_cpu${cpu}.log | awk '{print $2}')
echo " Max Latency: ${MAX_LAT}ns"
echo " Avg Latency: ${AVG_LAT}ns"
echo " >1ms samples: ${HIST_1MS}"
# 检查是否满足要求
if [ ${MAX_LAT:-0} -gt 2000000 ]; then # 2ms
echo " ❌ FAIL: Max latency > 2ms"
else
echo " ✅ PASS: Max latency within limits"
fi
done
# 生成性能报告
perf report -i /var/log/autodrive/perf_stress.data \
--stdio --sort comm,dso > /var/log/autodrive/perf_report.txt
echo "Stress test completed. Reports saved to /var/log/autodrive/"
六、 系统维护与故障处理
6.1 自动恢复脚本
#!/bin/bash
# /usr/local/bin/rt_recovery.sh
# 检测并恢复失败的实时任务
check_and_restart_task() {
local task_name=$1
local pid_file="/var/run/autodrive/$task_name.pid"
local restart_cmd=""
case $task_name in
"brake")
restart_cmd="taskset -c 3 chrt -f 99 /opt/autodrive/emergency_brake"
;;
"control")
restart_cmd="taskset -c 0 chrt -d --sched-runtime 2000000 --sched-deadline 21000000 --sched-period 20000000 0 /opt/autodrive/control_decision"
;;
"lidar")
restart_cmd="taskset -c 1 chrt -d --sched-runtime 5000000 --sched-deadline 52000000 --sched-period 50000000 0 /opt/autodrive/lidar_processor"
;;
"camera")
restart_cmd="taskset -c 2 chrt -d --sched-runtime 20000000 --sched-deadline 110000000 --sched-period 100000000 0 /opt/autodrive/camera_vision"
;;
esac
if [ -f "$pid_file" ]; then
local pid=$(cat "$pid_file")
if ! kill -0 "$pid" 2>/dev/null; then
echo "$(date): $task_name is not running, restarting..." >> /var/log/autodrive/recovery.log
# 清理残留资源
if [ -d "/proc/$pid" ]; then
kill -9 "$pid" 2>/dev/null
fi
# 重新启动
eval "$restart_cmd &"
local new_pid=$!
echo $new_pid > "$pid_file"
echo "$(date): $task_name restarted with PID $new_pid" >> /var/log/autodrive/recovery.log
# 发送警报
send_alert "Real-time task $task_name restarted"
fi
else
echo "$(date): PID file for $task_name not found" >> /var/log/autodrive/recovery.log
fi
}
# 主恢复循环
while true; do
for task in brake control lidar camera; do
check_and_restart_task $task
done
sleep 5 # 每5秒检查一次
done
6.2 性能数据分析工具
#!/usr/bin/env python3
# /opt/autodrive/analyze_rt_performance.py
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import json
class RTPerformanceAnalyzer:
def __init__(self, log_dir="/var/log/autodrive"):
self.log_dir = log_dir
self.tasks = ['brake', 'control', 'lidar', 'camera']
def parse_latency_logs(self):
"""解析延迟日志"""
latency_data = {}
for task in self.tasks:
log_file = f"{self.log_dir}/{task}_latency.log"
try:
with open(log_file, 'r') as f:
lines = f.readlines()
latencies = []
for line in lines:
if "latency" in line.lower():
# 提取延迟值(假设格式:latency=123456ns)
parts = line.split()
for part in parts:
if "latency=" in part:
latency = int(part.split('=')[1].replace('ns', ''))
latencies.append(latency)
if latencies:
latency_data[task] = {
'min': min(latencies),
'max': max(latencies),
'avg': sum(latencies) / len(latencies),
'p99': np.percentile(latencies, 99),
'p999': np.percentile(latencies, 99.9),
'samples': latencies
}
except FileNotFoundError:
print(f"Warning: Log file not found for {task}")
return latency_data
def generate_report(self, data):
"""生成性能报告"""
report = {
'timestamp': datetime.now().isoformat(),
'tasks': {},
'compliance': {},
'recommendations': []
}
# 任务要求(单位:纳秒)
requirements = {
'brake': {'max': 5000000}, # 5ms
'control': {'max': 21000000}, # 21ms
'lidar': {'max': 52000000}, # 52ms
'camera': {'max': 110000000} # 110ms
}
for task, metrics in data.items():
report['tasks'][task] = metrics
if task in requirements:
if metrics['max'] <= requirements[task]['max']:
report['compliance'][task] = 'PASS'
else:
report['compliance'][task] = 'FAIL'
report['recommendations'].append(
f"{task}: 最大延迟 {metrics['max']/1e6:.2f}ms "
f"超过要求 {requirements[task]['max']/1e6:.2f}ms"
)
# 保存报告
report_file = f"{self.log_dir}/performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
return report
def plot_latency_distribution(self, data):
"""绘制延迟分布图"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for idx, task in enumerate(self.tasks):
if task in data:
samples = data[task]['samples']
ax = axes[idx]
# 直方图
ax.hist(np.array(samples) / 1e6, bins=50, alpha=0.7)
ax.set_xlabel('Latency (ms)')
ax.set_ylabel('Frequency')
ax.set_title(f'{task.capitalize()} Task Latency Distribution')
# 添加统计信息
stats_text = (
f"Min: {data[task]['min']/1e6:.2f}ms\n"
f"Max: {data[task]['max']/1e6:.2f}ms\n"
f"Avg: {data[task]['avg']/1e6:.2f}ms\n"
f"P99: {data[task]['p99']/1e6:.2f}ms"
)
ax.text(0.95, 0.95, stats_text,
transform=ax.transAxes,
verticalalignment='top',
horizontalalignment='right',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
plot_file = f"{self.log_dir}/latency_distribution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
plt.savefig(plot_file, dpi=150)
plt.close()
return plot_file
if __name__ == "__main__":
analyzer = RTPerformanceAnalyzer()
print("Analyzing real-time performance data...")
# 解析数据
latency_data = analyzer.parse_latency_logs()
if latency_data:
# 生成报告
report = analyzer.generate_report(latency_data)
print("\nPerformance Summary:")
print("=" * 60)
for task, compliance in report['compliance'].items():
status = "✅" if compliance == 'PASS' else "❌"
max_latency = report['tasks'][task]['max'] / 1e6
print(f"{task:15} {status} {compliance:6} Max: {max_latency:6.2f}ms")
print("\n" + "=" * 60)
if report['recommendations']:
print("\nRecommendations:")
for rec in report['recommendations']:
print(f" • {rec}")
# 生成图表
plot_file = analyzer.plot_latency_distribution(latency_data)
print(f"\nLatency distribution plot saved to: {plot_file}")
else:
print("No latency data found for analysis")
七、 总结与应急预案
7.1 系统健康检查清单
#!/bin/bash
# /usr/local/bin/health_check.sh
echo "Autonomous Driving System Health Check"
echo "======================================"
check_passed=0
check_failed=0
check_item() {
local description=$1
local command=$2
local expected=$3
if eval "$command" 2>/dev/null | grep -q "$expected"; then
echo "✅ $description"
((check_passed++))
return 0
else
echo "❌ $description"
((check_failed++))
return 1
fi
}
# 1. 内核版本检查
check_item "Kernel is PREEMPT_RT" "uname -v" "PREEMPT RT"
# 2. CPU隔离检查
check_item "CPU0-3 are isolated" "cat /sys/devices/system/cpu/isolated" "0-3"
# 3. 实时任务运行检查
for task in control lidar camera brake; do
check_item "$task task is running" "ps aux | grep -v grep | grep $task" "$task"
done
# 4. 调度策略检查
check_item "Control task is SCHED_DEADLINE" "chrt -p \$(pidof control_decision)" "SCHED_DEADLINE"
check_item "Brake task is SCHED_FIFO" "chrt -p \$(pidof emergency_brake)" "SCHED_FIFO"
# 5. CPU频率检查
check_item "CPU0 in performance mode" "cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor" "performance"
# 6. 内存锁定检查
check_item "Brake task memory is locked" "grep VmLck /proc/\$(pidof emergency_brake)/status" "VmLck"
# 7. 中断亲和性检查
check_item "Brake IRQ on CPU3" "cat /proc/irq/\$(get_irq_for_device 0000:03:00.0)/smp_affinity 2>/dev/null" "8"
echo "======================================"
echo "Passed: $check_passed, Failed: $check_failed"
if [ $check_failed -eq 0 ]; then
echo "✅ System health check PASSED"
exit 0
else
echo "❌ System health check FAILED"
exit 1
fi
7.2 紧急情况预案
| 故障场景 |
检测方法 |
自动恢复动作 |
人工干预 |
| 实时任务崩溃 |
进程监控脚本 |
自动重启任务 |
检查崩溃原因 |
| 调度延迟超标 |
延迟监控 |
重启受影响任务 |
分析系统负载 |
| CPU过热降频 |
温度监控 |
降低非关键任务负载 |
检查冷却系统 |
| 内存不足 |
OOM监控 |
终止低优先级进程 |
增加物理内存 |
| 网络中断 |
心跳检测 |
切换备份网络接口 |
检查网络设备 |
通过以上完整的方案设计,自动驾驶系统的实时任务可以得到有效调度和监控,确保在各种情况下都能满足严格的实时性要求。系统具有自愈能力,能够在出现问题时自动恢复,并通过详细的监控和日志为问题分析提供支持。