找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3726

积分

1

好友

513

主题
发表于 6 天前 | 查看: 28| 回复: 0

前言

本文基于 gRPC 官方文档 grpc.io/docs/languages/cpp/callback,旨在学习如何在 C++ 中使用 gRPC 的异步回调 API 编写服务器和客户端。示例沿用经典的 RouteGuide 服务,重点在于记录和分析异步回调模式下的不同RPC类型的实现与常见问题。本文更多是实践过程的学习记录,欢迎技术交流与探讨。

环境信息

  • 操作系统版本:ubuntu 24.04
  • CMake版本:4.2.0
  • Git版本:2.43.0
  • GCC版本:gcc 13.3.0

本文示例运行于 Ubuntu 24.04 系统,相关的 gRPC 依赖库需预先编译部署。下文将从一个可运行的示例项目开始。

代码获取与编译运行

首先,克隆示例代码仓库并配置好 gRPC 依赖库路径。

git clone https://github.com/EarthlyImmortal/blog_code

进入项目目录后,需要调整编译脚本以匹配本地的高版本 GCC 编译器。项目提供了一个 start_build.sh 脚本,其核心内容如下:

#!/bin/sh
mkdir -p build && cd build
cmake .. -DCMAKE_C_COMPILER=/usr/bin/gcc -DCMAKE_CXX_COMPILER=/usr/bin/g++
make -j$(nproc)

请根据本地环境修改 -DCMAKE_C_COMPILER-DCMAKE_CXX_COMPILER 的路径。在 blog_code/route_guide_async_callback_api 目录下执行该脚本即可完成编译。

编译过程的终端输出显示了一系列文件被成功编译和链接:

[40%] Built target route_guide_proto
[60%] Building CXX object server/CMakeFiles/server.dir/route_guide_callback_server.cpp.o
[70%] Building CXX object client/CMakeFiles/client.dir/__common/helper.cpp.o
[80%] Building CXX object server/CMakeFiles/server.dir/__common/helper.cpp.o
[90%] Linking CXX executable client
[100%] Linking CXX executable server
[100%] Built target server
[100%] Built target client

编译成功后,分别启动服务器和客户端。

build/server 目录下启动服务器,指定地理特征数据库路径:

./server --db_path=../../route_guide_db.json

服务器启动日志显示成功加载了数据库并开始监听端口:

I0207 17:17:26.977556 230219 helper.cpp:158] DB parsed, loaded 100 features
I0207 17:17:26.980704 230219 route_guide_callback_server.cpp:322] Server listening on 0.0.0.0:50051

在另一个终端的 build/client 目录下启动客户端:

./client --db_path=../../route_guide_db.json

客户端运行后会依次调用四种 RPC 方法,并输出相应的结果。例如,对于 ListFeatures(服务器端流式 RPC)的调用,客户端会输出在指定矩形区域内查找到的地理特征:

Looking for features between 40, -75 and 42, -73
Found feature called Patriots Path, Mendham, NJ 07945, USA at 40.7838, -74.6144
...
ListFeatures rpc succeeded

对于 RecordRoute(客户端流式 RPC)的调用,客户端会输出模拟访问的路径点:

Visiting point 41.6851, -74.2675
Visiting point 40.7587, -74.167
...
Finished 5 features with 10 points
Passed 5 features
Travelled 841947 meters
It took 10 seconds

同时,服务器端的日志也会记录各个 RPC 的完成状态:

I0207 17:19:18.259654 230273 route_guide_callback_server.cpp:109] RPC Completed
I0207 17:19:27.27926 230270 route_guide_callback_server.cpp:158] RPC Completed
...

接下来,我们将深入分析服务器端与客户端代码中,四种 RPC 模式在异步回调 API 下的具体实现与关键细节。

服务器端代码分析与四种RPC实现

与同步 API 不同,异步回调 API 的服务器实现类需要继承自 RouteGuide::CallbackService,而非 RouteGuide::Service。两者的类定义对比如下:

// 异步回调API服务器实现类
class RouteGuideImpl final : public RouteGuide::CallbackService {
    // ...
};

// 同步API服务器实现类
class RouteGuideImpl final : public RouteGuide::Service {
    // ...
};

1. 一元 RPC (Unary RPC)

  • Reactor 类型grpc::ServerUnaryReactor
  • 核心回调
    • OnDone(): RPC 正常完成时调用。通常用于记录日志并释放 Reactor 对象。
    • OnCancel(): RPC 被客户端取消时调用。
  • 处理流程
    1. 客户端发送包含 Point 数据的 Unary RPC 请求。
    2. gRPC 框架调用 GetFeature() 方法,并传入 pointfeature 等参数,同时创建 Reactor 对象。
    3. 在 Reactor 的构造函数中,服务端设置 feature 的名称和位置,并调用 Finish(grpc::Status::OK) 通知框架可以发送响应。
    4. gRPC 框架异步地通过 HTTP/2 流发送响应数据。
    5. 响应发送完毕后,框架触发 OnDone() 回调,服务端在此记录“RPC Completed”并执行 delete this 销毁 Reactor。
    6. 可选路径:如果客户端中途取消请求,框架会触发 OnCancel() 回调,服务端记录“RPC Cancelled”。

2. 服务器端流式 RPC (Server-side streaming RPC)

  • Reactor 类型grpc::ServerWriteReactor
  • 核心回调:除了 OnDoneOnCancel,增加了 OnWriteDone(bool ok),用于在每个消息发送完成后驱动下一次发送或处理错误。
  • 处理流程
    1. 客户端发起请求,gRPC框架创建 Reactor。
    2. 在 Reactor 构造函数中,服务端调用 NextWrite() 启动发送流程。
    3. NextWrite() 函数遍历地理特征列表,找到落在请求指定矩形区域内的特征,并调用 StartWrite() 发送该特征。
    4. 每次写入操作完成后,OnWriteDone(bool ok) 被调用。如果 oktrue,则继续调用 NextWrite() 发送下一个特征;如果列表遍历完毕,则调用 Finish() 结束流。
    5. 所有数据发送完毕并调用 Finish() 后,最终触发 OnDone()

关键陷阱与修复:在 OnWriteDone 中,如果写入失败 (okfalse),必须进行错误处理并提前结束 RPC,否则可能导致程序崩溃。原始示例代码存在此隐患:

// 修改前,没有错误处理
void OnWriteDone(bool /*ok*/) override {
    NextWrite();
}

应修改为:

// 修改后,添加异常处理
void OnWriteDone(bool ok) override {
    if (!ok) {
        Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
        return; // 出现异常,提前返回,避免崩溃
    }
    NextWrite();
}

通过模拟一个强制失败的场景(注释掉 if(!ok) 判断,直接调用 Finish 后继续执行 NextWrite),服务器会因内部断言失败而崩溃,堆栈跟踪指向 CallbackWithSuccessTag::Set。这表明在 Finish 之后继续尝试写入操作是非法的。添加 return 语句后,服务器便能正常处理并结束 RPC。

3. 客户端流式 RPC (Client-side streaming RPC)

  • Reactor 类型grpc::ServerReadReactor
  • 核心回调OnReadDone(bool ok),在每个消息读取完成后调用。如果读取成功 (oktrue),则处理数据并调用 StartRead() 继续读取;如果流结束 (okfalse),则汇总数据并调用 Finish() 返回响应。
  • 处理流程
    1. RPC 建立,创建 Recorder Reactor。
    2. 在构造函数中,记录开始时间、初始化计数器,并调用 StartRead(&point_) 开始读取客户端发送的点数据。
    3. 进入流式数据读取循环:对于客户端发送的每个 PointOnReadDone(ok=true) 被触发,服务端处理该点数据(增加计数、检查特征、计算距离等),然后再次调用 StartRead(&point_) 准备读取下一个点。
    4. 客户端发送流结束信号后,OnReadDone(ok=false) 被触发。服务端设置 RouteSummary 摘要信息(点数量、特征数量、总距离、耗时),并调用 Finish(Status::OK) 返回该摘要。
    5. 最终触发 OnDone(),清理资源。

4. 双向流式 RPC (Bidirectional streaming RPC)

  • Reactor 类型grpc::ServerBidiReactor<RouteNote, RouteNote>
  • 核心回调:兼具读取 (OnReadDone) 和写入 (OnWriteDone) 回调。
  • 处理流程
    1. RPC 连接建立,创建 Chatter Reactor。
    2. 在构造函数中,初始化成员变量并调用 StartRead(¬e_) 开始读取客户端消息。
    3. 进入双向流式通信循环:
      • 客户端发送OnReadDone(true) 被触发,服务端处理接收到的 note_(保存到相关注册表),然后调用 NextWrite() 准备回复。
      • 服务端回复NextWrite() 中,服务端遍历所有需要回复的客户端,调用 StartWrite() 发送 RouteNote 响应。每次 OnWriteDone(true) 被调用,都继续 NextWrite() 发送下一个回复,直到本轮所有回复完成。
      • 回复完成后,服务端保存当前 note_ 并再次调用 StartRead(¬e_) 开始新一轮读取。
    4. 当客户端结束流时,OnReadDone(false) 被触发,服务端调用 Finish(Status::OK) 正常结束 RPC。
    5. 异常路径:如果客户端取消请求,则触发 OnCancel()

并发安全:由于 Chatter Reactor 内部维护了一个所有请求共用的 std::vector<RouteNote> received_notes_ 容器,而 gRPC 框架可能使用线程池处理并发请求,因此每次访问该容器时都必须进行加锁保护,以确保线程安全。同样,此处的 OnWriteDone 也需要添加与服务器端流式 RPC 相同的错误处理逻辑。

客户端代码分析与四种RPC实现

异步回调 API 的客户端初始化与同步 API 类似,主要区别在于具体的请求处理逻辑,它们通常涉及更显式的异步控制。

1. 一元 RPC (Unary RPC)

一元 RPC 的客户端调用是理解异步回调模式的基础。其核心逻辑在于使用局部变量(互斥锁、条件变量)在主线程和 gRPC 内部回调线程之间进行同步

主要流程如下:

  1. 发起异步调用:主线程调用 stub_->async()->GetFeature(),传入请求参数和一个 lambda 回调函数,然后立即返回。
  2. 主线程等待:主线程创建一个互斥锁 (mu) 和条件变量 (cv),并在条件变量上等待 (cv.wait),直到回调函数将完成标志 done 设为 true
  3. 服务端处理:gRPC 框架将请求发送至服务器,服务器处理并返回 Feature 响应。
  4. 异步回调处理:在一个独立的 gRPC 内部线程中,预先注册的 lambda 回调函数被触发。它检查 RPC 状态和返回的特征,设置结果和完成标志 done = true
  5. 通知主线程:回调函数在修改完共享状态后,调用 cv.notify_one() 唤醒在条件变量上等待的主线程。
  6. 主线程继续:主线程被唤醒,从 cv.wait 返回,获取结果并继续执行。

这种模式将阻塞等待从网络 I/O 转移到了线程同步原语上,释放了 I/O 能力。

2. 服务器端流式 RPC (Server-side streaming RPC)

  • Reactor 类型grpc::ClientReadReactor<Feature>
  • 核心结构:客户端自定义一个 Reactor 类,内部持有 ClientContext、互斥锁 (mu_)、条件变量 (cv_) 以及完成标志 (done_)。
  • 流程简述
    1. 在 Reactor 构造函数中,调用 stub_->async()->ListFeatures() 注册自身 Reactor,并调用 StartRead(&feature_) 开始读取和 StartCall() 真正发起请求。
    2. 每次服务器返回一个 FeatureOnReadDone(bool ok) 被调用。如果 oktrue,则处理该特征并再次调用 StartRead(&feature_) 读取下一个。
    3. 当流结束时(或出错),OnDone(const Status& status) 被调用。在此设置最终状态、标记完成,并通知 (cv_.notify_one()) 等待中的主线程。
    4. 主线程通过调用 Reactor 的 Await() 方法(内部使用 cv_.wait)阻塞,直到 OnDone 被调用。

3. 客户端流式 RPC (Client-side streaming RPC)

  • Reactor 类型grpc::ClientWriteReactor<Point, RouteSummary>
  • 关键方法AddHold()。在发送流式数据前调用,增加 Reactor 的内部持有计数,防止其在所有异步写入操作完成前被意外销毁。
  • 流程简述
    1. 在构造函数中注册 Reactor、调用 AddHold()、调用 StartCall() 发起请求。
    2. 通过 StartWrite(&point_) 发送第一个点。
    3. 每次 OnWriteDone(bool ok) 被调用,客户端延迟一段时间后,准备下一个点并再次调用 StartWrite 发送,模拟连续发送。
    4. 所有点发送完毕后,调用 StartWritesDone() 通知服务器端写入结束。
    5. 服务器返回 RouteSummary 后,触发 OnDone,通知主线程。

4. 双向流式 RPC (Bidirectional streaming RPC)

  • Reactor 类型grpc::ClientBidiReactor<RouteNote, RouteNote>
  • 流程简述:这是最复杂的模式,结合了读和写。
    1. 在构造函数中注册 Reactor,并同时调用 StartRead(¬e_)StartWrite(¬e_) 来启动读写循环,最后调用 StartCall()
    2. OnWriteDone 驱动下一个消息的发送。
    3. OnReadDone 处理接收到的服务器消息,并驱动下一次读取。
    4. 当读写都结束时,OnDone 被触发,通知主线程。

总结

通过本文对 C++ gRPC 异步回调 API 在 RouteGuide 示例中四种 RPC 模式(Unary、Server Streaming、Client Streaming、Bidirectional Streaming)的实践分析,我们可以看到其核心在于 Reactor 模式的应用。每种 RPC 类型对应特定的 Reactor 类(如 ServerUnaryReactor, ServerWriteReactor),并通过重写 OnDoneOnCancelOnReadDoneOnWriteDone 等生命周期回调函数来驱动业务逻辑。

关键点与注意事项:

  1. 错误处理至关重要:在 OnWriteDoneOnReadDone 等回调中,必须检查 bool ok 参数。若为 false,应及时调用 Finish 并返回,避免后续非法操作导致程序崩溃。这是在实际开发中极易忽略的陷阱。
  2. 线程同步:客户端为等待异步结果,常需使用互斥锁 (std::mutex) 和条件变量 (std::condition_variable) 进行线程间同步。服务端在处理共享数据时(如双向流示例中的 received_notes_)也需考虑加锁。
  3. 资源管理:Reactor 对象通常需要在 OnDone 回调中通过 delete this 自行销毁。客户端流式写入前调用 AddHold() 是防止提前销毁的重要机制。
  4. 流程驱动:异步回调模式由事件驱动,逻辑分散在各个回调函数中。理解每种 RPC 的“协议握手”流程(如何时调用 StartRead/StartWrite,何时调用 Finish)对于编写正确的代码至关重要。

相较于同步 API,异步回调 API 提供了更高的并发潜力和控制粒度,但代价是代码结构更复杂、更易于出错。对于构建高性能的 RPC 服务端,深入理解并妥善运用此模型是 C++ 后端开发者的重要技能。建议读者结合官方文档和示例代码亲手实践,并在调试中加深理解。

参考资料

[1] C++ grpc 异步回调API教程学习, 微信公众号:mp.weixin.qq.com/s/4hU0XMHne7brzOexqw2bow

版权声明:本文由 云栈社区 整理发布,版权归原作者所有。




上一篇:MongoDB架构设计详解:从副本集到分片集群的高可用与扩展实践
下一篇:MySQL SQL注入实战:绕过代码层WAF与软WAF的联合拦截策略
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 10:27 , Processed in 1.009108 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表