上一节中,当客户端的请求消息抵达服务端后,会触发 ChannelInboundHandlerAdapter.channelRead() 方法。该方法将消息进行简单的格式转换后,便会交给核心的请求处理器来处理具体的业务逻辑。本文我们就来深入剖析这个处理器的实现。
请求处理器设计
为了职责清晰,我们将消息处理的完整流程封装成一个独立的 RequestHandler 类。这个类主要承担三大职责:
- 消息解码:将网络传输过来的字节流,还原为程序可识别的请求对象。
- 服务调用:根据请求对象中的信息,找到本地对应的服务方法并通过反射进行调用。
- 响应编码:将方法调用的结果(或异常)封装并编码,准备发送回客户端。
其核心代码骨架如下:
public class RequestHandler {
private final MessageProtocol protocol;
private final ServiceRegistry serviceRegistry;
public RequestHandler(MessageProtocol protocol, ServiceRegistry serviceRegistry) {
this.protocol = protocol;
this.serviceRegistry = serviceRegistry;
}
public byte[] handleRequest(byte[] data) throws Exception {
// 请求消息解码 TODO
// 通过反射技术调用目标方法 TODO
}
}
RequestHandler 依赖两个关键组件:
MessageProtocol:消息协议,专门负责请求/响应消息的编解码。
ServiceRegistry:服务注册表,这是我们之前实现的服务注册中心。通过客户端请求中携带的服务接口信息,它能快速查找到对应的本地服务实现。
接下来,我们重点实现 handleRequest 方法。
实现消息协议处理
首先,我们需要通过消息协议接口,将客户端发送过来的原始字节数组解码为结构化的 RpcRequest 对象(该对象的具体定义将在客户端部分展开)。
RpcRequest 对象中必然包含了客户端想要调用的服务接口名。拿到接口名后,我们即可查询服务注册表,获取之前注册的服务实例信息。
public byte[] handleRequest(byte[] data) throws Exception {
// 请求消息解码
RpcRequest rpcRequest = protocol.unmarshallingReqMessage(data);
String serviceName = rpcRequest.getServiceName();
ServiceInterfaceInfo serviceInterfaceInfo = serviceRegistry.getRegisteredObj(serviceName);
if (serviceInterfaceInfo == null) {
RpcResponse response = new RpcResponse();
response.setStatus(“Not Found”);
return protocol.marshallingRespMessage(response);
}
// 通过反射技术调用目标方法 TODO
}
定义并实现消息协议接口
上面的代码依赖于一个抽象的消息协议服务。我们希望它能提供两个核心能力:解码客户端请求、编码服务端响应。这正好是一对逆过程。
我们首先用接口来定义这两个行为:
public interface MessageProtocol {
/**
* 解码请求消息
*
* @param data 客户端请求数据,字节数组格式
* @return rpc 请求对象
* @throws Exception 异常
*/
RpcRequest unmarshallingReqMessage(byte[] data) throws Exception;
/**
* 编码响应消息
*
* @param response 服务端待响应对象
* @return byte[] 对象编码后的字节数组
* @throws Exception 异常
*/
byte[] marshallingRespMessage(RpcResponse response) throws Exception;
}
接口的定义非常通用,你可以使用任何序列化方案来实现它。业界常见的方案有很多,例如阿里的 Fastjson、谷歌的 Protocol Buffers、以及 JDK 原生的序列化等。关于它们的性能对比,可以参考本系列之前的理论篇文章。
出于演示和简便考虑,这里我们使用 JDK 原生的序列化方式来实现一个默认协议:
public class DefaultMessageProtocol implements MessageProtocol {
@Override
public RpcRequest unmarshallingReqMessage(byte[] data) throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
return (RpcRequest) in.readObject();
}
@Override
public byte[] marshallingRespMessage(RpcResponse response) throws Exception {
return serialize(response);
}
private byte[] serialize(Object obj) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeObject(obj);
return baos.toByteArray();
}
}
编解码过程的核心是 ObjectInputStream 和 ObjectOutputStream 这两个类。对象输入流用于将字节数组反序列化成 RpcRequest 请求对象;对象输出流则用于将 RpcResponse 响应对象序列化成字节数组,这是Java中实现对象序列化的基础。
利用反射调用本地服务方法
经过解码,我们得到了结构清晰的 RpcRequest 对象,其中包含了目标接口名、方法名、参数类型和具体参数值。凭借这些信息,利用 Java 的反射机制,我们就能轻而易举地调用服务端本地的对应方法。
这一步是 RPC 的“灵魂”,它本质上完成了“远程调用”在服务端的本地执行。方法调用后会产生返回值,服务端需要将这个返回值(或异常信息)编码,以便通过网络传回客户端。
实现代码如下:
public byte[] handleRequest(byte[] data) throws Exception {
// 请求消息解码
// ……省略已实现的代码
try {
// 通过反射技术调用目标方法
final Method method = serviceInterfaceInfo.getClazz().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
final Object retValue = method.invoke(serviceInterfaceInfo.getObj(), rpcRequest.getParameters());
response.setStatus(“Success”);
response.setRetValue(retValue);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
response.setStatus(“Fail”);
response.setException(e);
}
return protocol.marshallingRespMessage(response);
}
代码中的 method.invoke() 正是 Java 反射 API 的典型应用。
至于 RpcResponse 响应对象,其结构比较简单,包含了调用状态、返回值、头部信息和可能的异常,代码如下:
public class RpcResponse implements Serializable {
// 调用成功或失败
private String status;
// 返回值对象
private Object retValue;
private Map<String, String> headers = new HashMap<>();
// 如果失败,返回异常对象
private Exception exception;
// 省略 getter/setter
}
项目代码结构
本小节我们一共实现了 4 个核心类/接口,它们共同构成了服务端消息处理的核心链路。为了清晰,我们将与序列化相关的类放在了新的 serialization 包下。
更新后的项目结构如下:
├── easy-rpc-spring-boot-starter
├── pom.xml
├── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── leixiaoshuai
│ │ └── easyrpc
│ │ ├── annotation
│ │ │ ├── ServiceExpose.java
│ │ │ └── ServiceReference.java
│ │ ├── common
│ │ │ └── ServiceInterfaceInfo.java
│ │ ├── listener
│ │ │ └── DefaultRpcListener.java
│ │ ├── serialization
│ │ │ ├── DefaultMessageProtocol.java
│ │ │ ├── MessageProtocol.java
│ │ │ └── RpcResponse.java
│ │ └── server
│ │ ├── network
│ │ │ ├── NettyRpcServer.java
│ │ │ ├── RequestHandler.java
│ │ │ └── RpcServer.java
│ │ └── registry
│ │ ├── NacosServiceRegistry.java
│ │ ├── ServiceRegistry.java
│ │ └── ZookeeperServiceRegistry.java
│ └── resources
└── target
总结
至此,服务端处理客户端请求的完整链条已经清晰:监听请求、解码消息、反射调用、编码响应。其中,消息处理器 (RequestHandler) 是承上启下的关键模块。
关于消息协议,本文为简化使用了 JDK 原生序列化。但在实际生产或学习过程中,强烈建议你尝试实现 Fastjson、Hessian、Protocol Buffers 等其他方案。这不仅能加深你对RPC框架通信层的理解,也是应对不同性能、兼容性需求的必备技能。
服务端的三大核心模块——服务注册、网络通信、消息处理——到此已全部讲解完毕。从下一篇开始,我们将视角转向客户端,看看它是如何封装请求、发起调用并处理响应的。整个RPC的拼图即将完整。
本实战项目的完整代码已开源,欢迎在 GitHub - easy-rpc 查阅和协作。动手实践是掌握知识的最佳途径,建议你跟随文章一步步编码实现,相信会有更大的收获。如果你在学习过程中有任何想法或问题,也欢迎来到云栈社区与其他开发者一起交流探讨。