前言
本文基于 gRPC 官方文档 grpc.io/docs/languages/cpp/callback,旨在学习如何在 C++ 中使用 gRPC 的异步回调 API 编写服务器和客户端。示例沿用经典的 RouteGuide 服务,重点在于记录和分析异步回调模式下的不同RPC类型的实现与常见问题。本文更多是实践过程的学习记录,欢迎技术交流与探讨。
环境信息
- 操作系统版本:ubuntu 24.04
- CMake版本:4.2.0
- Git版本:2.43.0
- GCC版本:gcc 13.3.0
本文示例运行于 Ubuntu 24.04 系统,相关的 gRPC 依赖库需预先编译部署。下文将从一个可运行的示例项目开始。
代码获取与编译运行
首先,克隆示例代码仓库并配置好 gRPC 依赖库路径。
git clone https://github.com/EarthlyImmortal/blog_code
进入项目目录后,需要调整编译脚本以匹配本地的高版本 GCC 编译器。项目提供了一个 start_build.sh 脚本,其核心内容如下:
#!/bin/sh
mkdir -p build && cd build
cmake .. -DCMAKE_C_COMPILER=/usr/bin/gcc -DCMAKE_CXX_COMPILER=/usr/bin/g++
make -j$(nproc)
请根据本地环境修改 -DCMAKE_C_COMPILER 和 -DCMAKE_CXX_COMPILER 的路径。在 blog_code/route_guide_async_callback_api 目录下执行该脚本即可完成编译。
编译过程的终端输出显示了一系列文件被成功编译和链接:
[40%] Built target route_guide_proto
[60%] Building CXX object server/CMakeFiles/server.dir/route_guide_callback_server.cpp.o
[70%] Building CXX object client/CMakeFiles/client.dir/__common/helper.cpp.o
[80%] Building CXX object server/CMakeFiles/server.dir/__common/helper.cpp.o
[90%] Linking CXX executable client
[100%] Linking CXX executable server
[100%] Built target server
[100%] Built target client
编译成功后,分别启动服务器和客户端。
在 build/server 目录下启动服务器,指定地理特征数据库路径:
./server --db_path=../../route_guide_db.json
服务器启动日志显示成功加载了数据库并开始监听端口:
I0207 17:17:26.977556 230219 helper.cpp:158] DB parsed, loaded 100 features
I0207 17:17:26.980704 230219 route_guide_callback_server.cpp:322] Server listening on 0.0.0.0:50051
在另一个终端的 build/client 目录下启动客户端:
./client --db_path=../../route_guide_db.json
客户端运行后会依次调用四种 RPC 方法,并输出相应的结果。例如,对于 ListFeatures(服务器端流式 RPC)的调用,客户端会输出在指定矩形区域内查找到的地理特征:
Looking for features between 40, -75 and 42, -73
Found feature called Patriots Path, Mendham, NJ 07945, USA at 40.7838, -74.6144
...
ListFeatures rpc succeeded
对于 RecordRoute(客户端流式 RPC)的调用,客户端会输出模拟访问的路径点:
Visiting point 41.6851, -74.2675
Visiting point 40.7587, -74.167
...
Finished 5 features with 10 points
Passed 5 features
Travelled 841947 meters
It took 10 seconds
同时,服务器端的日志也会记录各个 RPC 的完成状态:
I0207 17:19:18.259654 230273 route_guide_callback_server.cpp:109] RPC Completed
I0207 17:19:27.27926 230270 route_guide_callback_server.cpp:158] RPC Completed
...
接下来,我们将深入分析服务器端与客户端代码中,四种 RPC 模式在异步回调 API 下的具体实现与关键细节。
服务器端代码分析与四种RPC实现
与同步 API 不同,异步回调 API 的服务器实现类需要继承自 RouteGuide::CallbackService,而非 RouteGuide::Service。两者的类定义对比如下:
// 异步回调API服务器实现类
class RouteGuideImpl final : public RouteGuide::CallbackService {
// ...
};
// 同步API服务器实现类
class RouteGuideImpl final : public RouteGuide::Service {
// ...
};
1. 一元 RPC (Unary RPC)
- Reactor 类型:
grpc::ServerUnaryReactor
- 核心回调:
OnDone(): RPC 正常完成时调用。通常用于记录日志并释放 Reactor 对象。
OnCancel(): RPC 被客户端取消时调用。
- 处理流程:
- 客户端发送包含
Point 数据的 Unary RPC 请求。
- gRPC 框架调用
GetFeature() 方法,并传入 point 和 feature 等参数,同时创建 Reactor 对象。
- 在 Reactor 的构造函数中,服务端设置
feature 的名称和位置,并调用 Finish(grpc::Status::OK) 通知框架可以发送响应。
- gRPC 框架异步地通过 HTTP/2 流发送响应数据。
- 响应发送完毕后,框架触发
OnDone() 回调,服务端在此记录“RPC Completed”并执行 delete this 销毁 Reactor。
- 可选路径:如果客户端中途取消请求,框架会触发
OnCancel() 回调,服务端记录“RPC Cancelled”。
2. 服务器端流式 RPC (Server-side streaming RPC)
- Reactor 类型:
grpc::ServerWriteReactor
- 核心回调:除了
OnDone 和 OnCancel,增加了 OnWriteDone(bool ok),用于在每个消息发送完成后驱动下一次发送或处理错误。
- 处理流程:
- 客户端发起请求,gRPC框架创建 Reactor。
- 在 Reactor 构造函数中,服务端调用
NextWrite() 启动发送流程。
NextWrite() 函数遍历地理特征列表,找到落在请求指定矩形区域内的特征,并调用 StartWrite() 发送该特征。
- 每次写入操作完成后,
OnWriteDone(bool ok) 被调用。如果 ok 为 true,则继续调用 NextWrite() 发送下一个特征;如果列表遍历完毕,则调用 Finish() 结束流。
- 所有数据发送完毕并调用
Finish() 后,最终触发 OnDone()。
关键陷阱与修复:在 OnWriteDone 中,如果写入失败 (ok 为 false),必须进行错误处理并提前结束 RPC,否则可能导致程序崩溃。原始示例代码存在此隐患:
// 修改前,没有错误处理
void OnWriteDone(bool /*ok*/) override {
NextWrite();
}
应修改为:
// 修改后,添加异常处理
void OnWriteDone(bool ok) override {
if (!ok) {
Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
return; // 出现异常,提前返回,避免崩溃
}
NextWrite();
}
通过模拟一个强制失败的场景(注释掉 if(!ok) 判断,直接调用 Finish 后继续执行 NextWrite),服务器会因内部断言失败而崩溃,堆栈跟踪指向 CallbackWithSuccessTag::Set。这表明在 Finish 之后继续尝试写入操作是非法的。添加 return 语句后,服务器便能正常处理并结束 RPC。
3. 客户端流式 RPC (Client-side streaming RPC)
- Reactor 类型:
grpc::ServerReadReactor
- 核心回调:
OnReadDone(bool ok),在每个消息读取完成后调用。如果读取成功 (ok为true),则处理数据并调用 StartRead() 继续读取;如果流结束 (ok为false),则汇总数据并调用 Finish() 返回响应。
- 处理流程:
- RPC 建立,创建
Recorder Reactor。
- 在构造函数中,记录开始时间、初始化计数器,并调用
StartRead(&point_) 开始读取客户端发送的点数据。
- 进入流式数据读取循环:对于客户端发送的每个
Point,OnReadDone(ok=true) 被触发,服务端处理该点数据(增加计数、检查特征、计算距离等),然后再次调用 StartRead(&point_) 准备读取下一个点。
- 客户端发送流结束信号后,
OnReadDone(ok=false) 被触发。服务端设置 RouteSummary 摘要信息(点数量、特征数量、总距离、耗时),并调用 Finish(Status::OK) 返回该摘要。
- 最终触发
OnDone(),清理资源。
4. 双向流式 RPC (Bidirectional streaming RPC)
- Reactor 类型:
grpc::ServerBidiReactor<RouteNote, RouteNote>
- 核心回调:兼具读取 (
OnReadDone) 和写入 (OnWriteDone) 回调。
- 处理流程:
- RPC 连接建立,创建
Chatter Reactor。
- 在构造函数中,初始化成员变量并调用
StartRead(¬e_) 开始读取客户端消息。
- 进入双向流式通信循环:
- 客户端发送:
OnReadDone(true) 被触发,服务端处理接收到的 note_(保存到相关注册表),然后调用 NextWrite() 准备回复。
- 服务端回复:
NextWrite() 中,服务端遍历所有需要回复的客户端,调用 StartWrite() 发送 RouteNote 响应。每次 OnWriteDone(true) 被调用,都继续 NextWrite() 发送下一个回复,直到本轮所有回复完成。
- 回复完成后,服务端保存当前
note_ 并再次调用 StartRead(¬e_) 开始新一轮读取。
- 当客户端结束流时,
OnReadDone(false) 被触发,服务端调用 Finish(Status::OK) 正常结束 RPC。
- 异常路径:如果客户端取消请求,则触发
OnCancel()。
并发安全:由于 Chatter Reactor 内部维护了一个所有请求共用的 std::vector<RouteNote> received_notes_ 容器,而 gRPC 框架可能使用线程池处理并发请求,因此每次访问该容器时都必须进行加锁保护,以确保线程安全。同样,此处的 OnWriteDone 也需要添加与服务器端流式 RPC 相同的错误处理逻辑。
客户端代码分析与四种RPC实现
异步回调 API 的客户端初始化与同步 API 类似,主要区别在于具体的请求处理逻辑,它们通常涉及更显式的异步控制。
1. 一元 RPC (Unary RPC)
一元 RPC 的客户端调用是理解异步回调模式的基础。其核心逻辑在于使用局部变量(互斥锁、条件变量)在主线程和 gRPC 内部回调线程之间进行同步。
主要流程如下:
- 发起异步调用:主线程调用
stub_->async()->GetFeature(),传入请求参数和一个 lambda 回调函数,然后立即返回。
- 主线程等待:主线程创建一个互斥锁 (
mu) 和条件变量 (cv),并在条件变量上等待 (cv.wait),直到回调函数将完成标志 done 设为 true。
- 服务端处理:gRPC 框架将请求发送至服务器,服务器处理并返回
Feature 响应。
- 异步回调处理:在一个独立的 gRPC 内部线程中,预先注册的 lambda 回调函数被触发。它检查 RPC 状态和返回的特征,设置结果和完成标志
done = true。
- 通知主线程:回调函数在修改完共享状态后,调用
cv.notify_one() 唤醒在条件变量上等待的主线程。
- 主线程继续:主线程被唤醒,从
cv.wait 返回,获取结果并继续执行。
这种模式将阻塞等待从网络 I/O 转移到了线程同步原语上,释放了 I/O 能力。
2. 服务器端流式 RPC (Server-side streaming RPC)
- Reactor 类型:
grpc::ClientReadReactor<Feature>
- 核心结构:客户端自定义一个 Reactor 类,内部持有
ClientContext、互斥锁 (mu_)、条件变量 (cv_) 以及完成标志 (done_)。
- 流程简述:
- 在 Reactor 构造函数中,调用
stub_->async()->ListFeatures() 注册自身 Reactor,并调用 StartRead(&feature_) 开始读取和 StartCall() 真正发起请求。
- 每次服务器返回一个
Feature,OnReadDone(bool ok) 被调用。如果 ok 为 true,则处理该特征并再次调用 StartRead(&feature_) 读取下一个。
- 当流结束时(或出错),
OnDone(const Status& status) 被调用。在此设置最终状态、标记完成,并通知 (cv_.notify_one()) 等待中的主线程。
- 主线程通过调用 Reactor 的
Await() 方法(内部使用 cv_.wait)阻塞,直到 OnDone 被调用。
3. 客户端流式 RPC (Client-side streaming RPC)
- Reactor 类型:
grpc::ClientWriteReactor<Point, RouteSummary>
- 关键方法:
AddHold()。在发送流式数据前调用,增加 Reactor 的内部持有计数,防止其在所有异步写入操作完成前被意外销毁。
- 流程简述:
- 在构造函数中注册 Reactor、调用
AddHold()、调用 StartCall() 发起请求。
- 通过
StartWrite(&point_) 发送第一个点。
- 每次
OnWriteDone(bool ok) 被调用,客户端延迟一段时间后,准备下一个点并再次调用 StartWrite 发送,模拟连续发送。
- 所有点发送完毕后,调用
StartWritesDone() 通知服务器端写入结束。
- 服务器返回
RouteSummary 后,触发 OnDone,通知主线程。
4. 双向流式 RPC (Bidirectional streaming RPC)
- Reactor 类型:
grpc::ClientBidiReactor<RouteNote, RouteNote>
- 流程简述:这是最复杂的模式,结合了读和写。
- 在构造函数中注册 Reactor,并同时调用
StartRead(¬e_) 和 StartWrite(¬e_) 来启动读写循环,最后调用 StartCall()。
OnWriteDone 驱动下一个消息的发送。
OnReadDone 处理接收到的服务器消息,并驱动下一次读取。
- 当读写都结束时,
OnDone 被触发,通知主线程。
总结
通过本文对 C++ gRPC 异步回调 API 在 RouteGuide 示例中四种 RPC 模式(Unary、Server Streaming、Client Streaming、Bidirectional Streaming)的实践分析,我们可以看到其核心在于 Reactor 模式的应用。每种 RPC 类型对应特定的 Reactor 类(如 ServerUnaryReactor, ServerWriteReactor),并通过重写 OnDone、OnCancel、OnReadDone、OnWriteDone 等生命周期回调函数来驱动业务逻辑。
关键点与注意事项:
- 错误处理至关重要:在
OnWriteDone、OnReadDone 等回调中,必须检查 bool ok 参数。若为 false,应及时调用 Finish 并返回,避免后续非法操作导致程序崩溃。这是在实际开发中极易忽略的陷阱。
- 线程同步:客户端为等待异步结果,常需使用互斥锁 (
std::mutex) 和条件变量 (std::condition_variable) 进行线程间同步。服务端在处理共享数据时(如双向流示例中的 received_notes_)也需考虑加锁。
- 资源管理:Reactor 对象通常需要在
OnDone 回调中通过 delete this 自行销毁。客户端流式写入前调用 AddHold() 是防止提前销毁的重要机制。
- 流程驱动:异步回调模式由事件驱动,逻辑分散在各个回调函数中。理解每种 RPC 的“协议握手”流程(如何时调用
StartRead/StartWrite,何时调用 Finish)对于编写正确的代码至关重要。
相较于同步 API,异步回调 API 提供了更高的并发潜力和控制粒度,但代价是代码结构更复杂、更易于出错。对于构建高性能的 RPC 服务端,深入理解并妥善运用此模型是 C++ 后端开发者的重要技能。建议读者结合官方文档和示例代码亲手实践,并在调试中加深理解。
参考资料
[1] C++ grpc 异步回调API教程学习, 微信公众号:mp.weixin.qq.com/s/4hU0XMHne7brzOexqw2bow
版权声明:本文由 云栈社区 整理发布,版权归原作者所有。