如果你用 Python 写过生产级的异步应用,一定经历过这个时刻:凌晨三点,日志里突然蹦出 RuntimeError: Event loop is closed,或者更诡异的 Cannot run the event loop while another loop is running。你盯着屏幕,看着那段明明在本地测试完美的代码,陷入了沉思。
这不是你的错。这是 Python 的 asyncio 给我们设下的结构性陷阱——尤其是在你需要将同步生态(如 OpenAI SDK、传统 HTTP 库)与异步架构强行嫁接时。
Hermes,这个开源的 AI Agent 框架,选择正面硬刚这个问题。通过剖析它的源码,我发现它并没有使用什么“银弹”,而是针对四种不同的并发场景,设计了四种截然不同的事件循环策略。这种精细化的循环治理,恰恰是 高并发系统 设计的精髓。
今天,我们就来拆解 Hermes 的四重事件循环架构。
核心矛盾:客户端与循环的绑定困境
在深入案例之前,我们必须理解问题的本质。现代 AI SDK(如 OpenAI)为了性能,会维护持久的 HTTP 连接(通过 httpx.AsyncClient)。但这里有个隐晦的约束:异步客户端与其创建时所在的 event loop 是强绑定的。
这意味着什么?
- 你不能在 Loop A 创建客户端,然后在 Loop B 中使用它
- 当 Loop A 关闭后,客户端的清理逻辑(
__del__ 中的 aclose())如果被执行,就会因为找不到原循环而崩溃
- 在多线程环境下,每个线程默认没有自己的循环,获取
get_event_loop() 可能返回主循环,也可能报错
Hermes 的所有设计,都是围绕“如何在不破坏这种绑定关系的前提下,实现灵活调度”这一核心命题展开的。
第一重:持久循环与客户端缓存(The Persistent Loop)
场景: CLI 交互工具,需要频繁调用 Vision API 分析图片。
陷阱: 如果每次对话都创建新的 AsyncOpenAI 客户端,TCP 三次握手的开销会让你崩溃;但如果简单地在全局缓存一个客户端实例,当程序进入 idle 状态(如 prompt_toolkit 的等待循环),之前绑定的 loop 可能已关闭,此时垃圾回收触发,直接 RuntimeError。
Hermes 的解法:
源码在 auxiliary_client.py 中展示了一个精妙的三层缓存策略:
# 核心洞察:用 loop 的身份 ID 作为缓存键的一部分
current_loop = asyncio.get_event_loop()
loop_id = id(current_loop) # 内存地址作为唯一标识
cache_key = (provider, async_mode, base_url, api_key, loop_id)
这确保了“同循环复用,跨循环隔离”。但还不够,Hermes 更进一步,做了一件看似暴力实则精准的事:
def neuter_async_httpx_del():
"""让 AsyncHttpxClientWrapper.__del__ 变成空操作"""
AsyncHttpxClientWrapper.__del__ = lambda self: None
为什么敢这么干? 因为 Python 的垃圾回收器是非确定性的。当 __del__ 被调用时,客户端绑定的 loop 可能早已寿终正寝(特别是 CLI 工具的交互循环和工具调用的工作循环是分开的)。与其让 SDK 在错误的时机尝试清理,不如接管控制权,在程序退出时手动、有序地关闭:
def shutdown_cached_clients():
for key, (client, _, loop) in _client_cache.items():
if loop is None or not loop.is_closed():
_force_close_async_httpx(client)
设计智慧: 承认异步资源的生命周期复杂性,拒绝依赖语言的自动清理机制,改为显式生命周期管理。
第二重:线程隔离与并发代理(The Thread-Local Loop)
场景: 父 Agent 需要同时委派三个子 Agent 调研不同项目,要求真正的并行而非协程的并发。
陷阱: 如果在主循环中创建子 Agent,它们共享同一个 loop。当子 Agent 的线程结束时,其创建的异步客户端可能还在后台尝试清理,而此时主循环仍在运行——跨线程的 loop 污染发生了。
Hermes 的解法:
每个子 Agent 在独立的 ThreadPoolExecutor 工作线程中运行,并且每个线程维护自己的独立 event loop:
def _run_single_child(task_index, goal, child, parent_agent):
# 在这个线程的上下文中运行
result = child.run_conversation(user_message=goal)
# 线程结束时,其内部的 loop 也随之消亡
# 生命周期完美同步,没有跨循环引用
配合 as_completed() 实现优雅的并发控制:
for future in as_completed(futures):
entry = future.result()
# 实时进度反馈,每个完成的任务立即更新 UI
spinner_ref.print_above(f"✓ [{completed_count}/{n_tasks}] {label}")
关键洞察: 线程与 loop 的同生共死。当线程池回收线程时,其关联的 loop 也一并销毁,客户端资源在正确的上下文中被释放,彻底杜绝了“Loop is closed” 错误。
第三重:线程卸载与嵌套循环(The Offloading Pattern)
场景: Hermes Gateway 需要同时处理 10 个 Telegram 消息,但某些遗留 API 调用是阻塞的(如使用 requests 库)。
陷阱: 直接在 async 函数中调用阻塞操作会冻结整个事件循环。更糟糕的是,如果这个阻塞函数内部为了兼容而尝试 asyncio.run(),你会遭遇 asyncio 的终极禁令:禁止嵌套事件循环。
Hermes 的解法:
使用 asyncio.to_thread() 将阻塞操作卸载到独立线程,并在该线程中创建临时性的嵌套循环:
async def handle_message(message):
# 主循环保持极度轻量,立即交出控制权
result = await asyncio.to_thread(
legacy_blocking_api_call, # 这可能内部调用 asyncio.run()
message.user_id
)
在 legacy_blocking_api_call 内部:
def legacy_blocking_api_call(user_id):
# 现在我们在一个独立的线程中,可以安全地创建新 loop
asyncio.run(async_subtask()) # 不再报错!
架构价值: 这实现了异步-同步-异步的无损穿越。主循环永远不会被阻塞,而阻塞操作在沙盒线程中获得了自己的异步执行环境。这是一种结构化并发(Structured Concurrency)的实践——通过线程边界来隔离不同级别的异步上下文。
第四重:规模化线程池治理(The Managed Pool)
场景: 强化学习训练,同时运行 89 个评估任务,每个任务都需要频繁调用工具(如执行 shell、读写文件)。
陷阱: 如果线程池太小(如默认的 4-8 线程),89 个任务会排队等待,训练时间从几分钟膨胀到几小时;如果每个任务都创建独立线程,上下文切换开销会压垮 CPU。
Hermes 的解法:
预分配一个大规模但可控的全局线程池(默认 128 线程),专门用于运行那些“内部使用 asyncio.run() 的同步工具”:
# 全局工具线程池,生命周期与应用程序一致
_tool_executor = ThreadPoolExecutor(max_workers=128)
def resize_tool_pool(max_workers: int):
"""动态扩缩容,适应不同硬件配置"""
global _tool_executor
old_executor = _tool_executor
_tool_executor = ThreadPoolExecutor(max_workers=max_workers)
old_executor.shutdown(wait=False) # 优雅切换,不中断现有任务
关键注释揭示了设计意图:
“Size must be large enough for concurrent eval tasks (e.g., 89 TB2 tasks all making tool calls). Too small = thread pool starvation, tasks queue for minutes.”
这不仅是资源管理,更是一种背压(Backpressure)机制。128 个线程是并发上限,防止资源耗尽;而线程池的队列则天然地实现了流量整形(Traffic Shaping)。
深层架构哲学
通观这四个案例,Hermes 的事件循环策略遵循三条铁律:
1. 身份绑定原则(Identity Binding)
通过 id(current_loop) 作为缓存键,严格执行“客户端-循环”的一对一绑定。这避免了 Python 中最隐蔽的 bug:在错误的循环上调度协程。
2. 生命周期对齐(Lifecycle Alignment)
- 持久循环:用于长生命周期的工具客户端
- 线程本地循环:随线程创建而创建,随线程销毁而销毁
- 临时循环:为一次性阻塞操作提供隔离环境
循环的边界即是故障的边界。
3. 显式优于隐式(Explicit over Implicit)
neuter_async_httpx_del() 是这一哲学的极致体现。拒绝依赖 __del__ 的幽灵般的清理行为,改为在程序生命周期的关键点(如 CLI 退出、线程结束、Gateway 关闭)显式调用 shutdown_cached_clients()。
可复用的设计模式
从 Hermes 的实践中,我们可以提炼出几个生产级异步系统的设计模式:
| 模式 |
适用场景 |
实现要点 |
| Loop-Aware 缓存 |
需要复用昂贵资源(如 HTTP 客户端) |
缓存键包含 id(event_loop),定期检测 loop.is_closed() |
| 线程卸载 |
必须调用阻塞 API |
asyncio.to_thread() + 函数内部使用 asyncio.run() |
| 循环隔离 |
并发子任务需要独立异步环境 |
每个任务独占线程,线程内独立 loop |
| 池化治理 |
高并发下的资源限制 |
预分配大容量线程池(>预期并发数),支持动态扩缩容 |
结语:驯服复杂性
asyncio 常被诟病为“带着镣铐跳舞”。但在 Hermes 的架构中,我们看到了另一种可能:不是对抗复杂性,而是将其显式化、结构化、分层化。
当你下次面对 Event loop is closed 的错误时,不妨问自己:
- 我的客户端绑定在哪个循环?
- 这个循环的生命周期由谁控制?
- 当线程/程序退出时,谁在负责清理?
Hermes 用这四重循环架构告诉我们:并发编程的本质,不是让事情同时发生,而是让边界清晰可控。
毕竟,在软件工程中,清晰的边界比极致的性能更重要——尤其是在你要维护一个能同时处理 89 个 RL 任务、10 个 Gateway 消息、3 个子 Agent 并行的生产级系统时。
想探讨更多关于 Python 异步编程的实战技巧与架构思考,欢迎来 云栈社区 交流分享你的见解。