找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3821

积分

0

好友

527

主题
发表于 3 天前 | 查看: 19| 回复: 0

上一节中,当客户端的请求消息抵达服务端后,会触发 ChannelInboundHandlerAdapter.channelRead() 方法。该方法将消息进行简单的格式转换后,便会交给核心的请求处理器来处理具体的业务逻辑。本文我们就来深入剖析这个处理器的实现。

请求处理器设计

为了职责清晰,我们将消息处理的完整流程封装成一个独立的 RequestHandler 类。这个类主要承担三大职责:

  1. 消息解码:将网络传输过来的字节流,还原为程序可识别的请求对象。
  2. 服务调用:根据请求对象中的信息,找到本地对应的服务方法并通过反射进行调用。
  3. 响应编码:将方法调用的结果(或异常)封装并编码,准备发送回客户端。

其核心代码骨架如下:

public class RequestHandler {
    private final MessageProtocol protocol;
    private final ServiceRegistry serviceRegistry;

    public RequestHandler(MessageProtocol protocol, ServiceRegistry serviceRegistry) {
        this.protocol = protocol;
        this.serviceRegistry = serviceRegistry;
    }

    public byte[] handleRequest(byte[] data) throws Exception {
        // 请求消息解码 TODO
       // 通过反射技术调用目标方法 TODO
    }
}

RequestHandler 依赖两个关键组件:

  • MessageProtocol:消息协议,专门负责请求/响应消息的编解码。
  • ServiceRegistry:服务注册表,这是我们之前实现的服务注册中心。通过客户端请求中携带的服务接口信息,它能快速查找到对应的本地服务实现。

接下来,我们重点实现 handleRequest 方法。

实现消息协议处理

首先,我们需要通过消息协议接口,将客户端发送过来的原始字节数组解码为结构化的 RpcRequest 对象(该对象的具体定义将在客户端部分展开)。

RpcRequest 对象中必然包含了客户端想要调用的服务接口名。拿到接口名后,我们即可查询服务注册表,获取之前注册的服务实例信息。

public byte[] handleRequest(byte[] data) throws Exception {
    // 请求消息解码
    RpcRequest rpcRequest = protocol.unmarshallingReqMessage(data);
    String serviceName = rpcRequest.getServiceName();
    ServiceInterfaceInfo serviceInterfaceInfo = serviceRegistry.getRegisteredObj(serviceName);

    if (serviceInterfaceInfo == null) {
       RpcResponse response = new RpcResponse();
        response.setStatus(“Not Found”);
        return protocol.marshallingRespMessage(response);
    }

   // 通过反射技术调用目标方法 TODO
}

定义并实现消息协议接口

上面的代码依赖于一个抽象的消息协议服务。我们希望它能提供两个核心能力:解码客户端请求、编码服务端响应。这正好是一对逆过程。

我们首先用接口来定义这两个行为:

public interface MessageProtocol {
    /**
     * 解码请求消息
     *
     * @param data 客户端请求数据,字节数组格式
     * @return rpc 请求对象
     * @throws Exception 异常
     */
    RpcRequest unmarshallingReqMessage(byte[] data) throws Exception;

    /**
     * 编码响应消息
     *
     * @param response 服务端待响应对象
     * @return byte[] 对象编码后的字节数组
     * @throws Exception 异常
     */
    byte[] marshallingRespMessage(RpcResponse response) throws Exception;
}

接口的定义非常通用,你可以使用任何序列化方案来实现它。业界常见的方案有很多,例如阿里的 Fastjson、谷歌的 Protocol Buffers、以及 JDK 原生的序列化等。关于它们的性能对比,可以参考本系列之前的理论篇文章。

出于演示和简便考虑,这里我们使用 JDK 原生的序列化方式来实现一个默认协议:

public class DefaultMessageProtocol implements MessageProtocol {
    @Override
    public RpcRequest unmarshallingReqMessage(byte[] data) throws Exception {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
        return (RpcRequest) in.readObject();
    }

    @Override
    public byte[] marshallingRespMessage(RpcResponse response) throws Exception {
        return serialize(response);
    }

   private byte[] serialize(Object obj) throws Exception {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(baos);
        out.writeObject(obj);
        return baos.toByteArray();
    }
}

编解码过程的核心是 ObjectInputStreamObjectOutputStream 这两个类。对象输入流用于将字节数组反序列化成 RpcRequest 请求对象;对象输出流则用于将 RpcResponse 响应对象序列化成字节数组,这是Java中实现对象序列化的基础。

利用反射调用本地服务方法

经过解码,我们得到了结构清晰的 RpcRequest 对象,其中包含了目标接口名、方法名、参数类型和具体参数值。凭借这些信息,利用 Java 的反射机制,我们就能轻而易举地调用服务端本地的对应方法。

这一步是 RPC 的“灵魂”,它本质上完成了“远程调用”在服务端的本地执行。方法调用后会产生返回值,服务端需要将这个返回值(或异常信息)编码,以便通过网络传回客户端。

实现代码如下:

public byte[] handleRequest(byte[] data) throws Exception {
    // 请求消息解码
    // ……省略已实现的代码

    try {
        // 通过反射技术调用目标方法
        final Method method = serviceInterfaceInfo.getClazz().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
        final Object retValue = method.invoke(serviceInterfaceInfo.getObj(), rpcRequest.getParameters());
        response.setStatus(“Success”);
        response.setRetValue(retValue);
    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
        response.setStatus(“Fail”);
        response.setException(e);
    }
    return protocol.marshallingRespMessage(response);
}

代码中的 method.invoke() 正是 Java 反射 API 的典型应用。

至于 RpcResponse 响应对象,其结构比较简单,包含了调用状态、返回值、头部信息和可能的异常,代码如下:

public class RpcResponse implements Serializable {
    // 调用成功或失败
    private String status;
    // 返回值对象
    private Object retValue;
    private Map<String, String> headers = new HashMap<>();
    // 如果失败,返回异常对象
    private Exception exception;
    // 省略 getter/setter
}

项目代码结构

本小节我们一共实现了 4 个核心类/接口,它们共同构成了服务端消息处理的核心链路。为了清晰,我们将与序列化相关的类放在了新的 serialization 包下。

更新后的项目结构如下:

├── easy-rpc-spring-boot-starter
    ├── pom.xml
    ├── src
    │   └── main
    │       ├── java
    │       │   └── com
    │       │       └── leixiaoshuai
    │       │           └── easyrpc
    │       │               ├── annotation
    │       │               │   ├── ServiceExpose.java
    │       │               │   └── ServiceReference.java
    │       │               ├── common    
    │       │               │   └── ServiceInterfaceInfo.java    
    │       │               ├── listener
    │       │               │   └── DefaultRpcListener.java
    │       │               ├── serialization
    │       │               │   ├── DefaultMessageProtocol.java
    │       │               │   ├── MessageProtocol.java
    │       │               │   └── RpcResponse.java
    │       │               └── server
    │       │                   ├── network
    │       │                   │   ├── NettyRpcServer.java
    │       │                   │   ├── RequestHandler.java
    │       │                   │   └── RpcServer.java
    │       │                   └── registry
    │       │                       ├── NacosServiceRegistry.java
    │       │                       ├── ServiceRegistry.java
    │       │                       └── ZookeeperServiceRegistry.java
    │       └── resources
    └── target

总结

至此,服务端处理客户端请求的完整链条已经清晰:监听请求、解码消息、反射调用、编码响应。其中,消息处理器 (RequestHandler) 是承上启下的关键模块。

关于消息协议,本文为简化使用了 JDK 原生序列化。但在实际生产或学习过程中,强烈建议你尝试实现 Fastjson、Hessian、Protocol Buffers 等其他方案。这不仅能加深你对RPC框架通信层的理解,也是应对不同性能、兼容性需求的必备技能。

服务端的三大核心模块——服务注册网络通信消息处理——到此已全部讲解完毕。从下一篇开始,我们将视角转向客户端,看看它是如何封装请求、发起调用并处理响应的。整个RPC的拼图即将完整。

本实战项目的完整代码已开源,欢迎在 GitHub - easy-rpc 查阅和协作。动手实践是掌握知识的最佳途径,建议你跟随文章一步步编码实现,相信会有更大的收获。如果你在学习过程中有任何想法或问题,也欢迎来到云栈社区与其他开发者一起交流探讨。




上一篇:微服务架构演进:从单体到分布式,如何设计高可用系统
下一篇:使用Netty实现RPC框架:手把手构建服务端网络通信层
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 10:21 , Processed in 0.447015 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表