在自定义监听器完成服务接口注册后,构建RPC框架的下一步,便是启动网络通信服务器。只有服务器成功监听指定端口,客户端才能与服务端建立连接并进行通信。
项目《进阶实战:从零开始撸一个 RPC 框架》开源地址与系列导航:
https://github.com/CoderLeixiaoshuai/easy-rpc
本篇文章是该系列实战篇的延续,将重点讲解服务端网络通信容器的实现。
启动 RPC 网络通信服务容器
服务注册成功意味着服务已对外暴露。为了让客户端能够发起调用,注册完成后必须立即启动网络服务器并开始监听端口。
为了聚焦核心逻辑,我们只关注关键的启动步骤,具体代码如下:
// DefaultRpcListener.java
private void initRpcServer(ApplicationContext applicationContext) {
// 1.1 扫描服务端@ServiceExpose注解,并将服务接口信息注册到注册中心
// ……省略已实现的代码
// 1.2 启动网络通信服务器,开始监听指定端口
genericRpcServer.start();
}
在 DefaultRpcListener.java 中,通过依赖注入的 RpcService 调用其 start() 方法来启动服务器。下面我们来深入剖析这个方法的具体实现。
定义 RPC 服务容器接口
一个基础的 RPC 网络通信服务器至少需要提供启动和停止两种能力。为了代码的扩展性,我们首先定义一个清晰的接口。
public interface RpcServer {
/**
* 启动服务
*/
void start();
/**
* 停止服务
*/
void stop();
}
接口定义了 start() 和 stop() 两个无参方法。启动服务是核心,我们将重点阐述;停止服务的实现相对简单。
使用 Netty 实现 RPC 服务容器
Netty 因其高性能和易用性,成为众多分布式框架默认的网络通信组件。我们的 EasyRPC 框架也将采用它来实现服务端。
首先,创建一个 NettyRpcServer 类来实现 RpcServer 接口:
public class NettyRpcServer implements RpcServer {
private int port;
private RequestHandler requestHandler;
public NettyRpcServer(int port, RequestHandler requestHandler) {
this.port = port;
this.requestHandler = requestHandler;
}
@Override
public void start() {
// TODO
}
@Override
public void stop() {
// TODO
}
}
这里定义了两个关键成员变量:
port: Netty 服务器监听的端口号,客户端通过此端口连接服务端。
requestHandler: 请求处理器,负责处理接收到的客户端请求,包括解码、反射调用本地方法、编码响应等核心逻辑。
接下来,我们实现最重要的 start() 方法:
// NettyRpcServer.java
@Override
public void start() {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务端的启动对象
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 设置两个线程组
.group(bossGroup, workerGroup)
// 设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
// 服务端用于接收进来的连接,也就是boosGroup线程,线程队列大小
.option(ChannelOption.SO_BACKLOG, 100)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// child 通道,worker 线程处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
// 给 pipeline 管道设置自定义的处理器
@Override
public void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ChannelRequestHandler());
}
});
// 绑定端口号,同步启动服务
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
logger.info(“[easy-rpc]Rpc Server started on port: {}”, port);
channel = channelFuture.channel();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error(“server error.”, e);
} finally {
// 释放线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
这段代码是 Netty 服务端的标准启动流程。其核心在于初始化了两个线程组:bossGroup 负责接收客户端连接,workerGroup 负责处理具体的I/O操作和业务逻辑,这也是它们名称的由来。
在 workerGroup 的管道(pipeline)中,我们添加了一个自定义的处理器 ChannelRequestHandler 来处理入站请求。下面来实现这个处理器:
// NettyRpcServer.java 内部类
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info(“Server receive a massage: {}”, msg);
final ByteBuf msgBuf = (ByteBuf) msg;
final byte[] reqBytes = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(reqBytes);
// 调用请求处理器开始处理客户端请求
final byte[] respBytes = requestHandler.handleRequest(reqBytes);
logger.info(“Send response massage: {}”, respBytes);
final ByteBuf resBuf = Unpooled.buffer(respBytes.length);
resBuf.writeBytes(respBytes);
ctx.writeAndFlush(resBuf);
}
// ……省略不重要的代码
}
ChannelRequestHandler 继承自 ChannelInboundHandlerAdapter(适配器模式的典型应用)。当有客户端数据到达时,会自动触发 channelRead 方法。
在该方法中,我们将接收到的 Netty ByteBuf 类型消息转换为 byte[] 字节数组,然后交由前面定义的 requestHandler 对象进行真正的业务处理(包括反序列化、服务端注解驱动程序调用、序列化响应等)。处理完成后,再将返回的 byte[] 数据包装成 ByteBuf 写回通道,从而完成一次完整的请求-响应通信。
注意:ByteBuf 是 Netty 提供的高效字节容器,并非 JDK 自带类。关于 requestHandler.handleRequest(reqBytes) 内部具体的消息处理流程,我们将在下一篇文章中详细展开。
代码结构
本节新增了两个核心类 RpcServer(接口)和 NettyRpcServer(实现),并将其组织在 server.network 包下。项目整体结构如下:
├── easy-rpc-spring-boot-starter
├── pom.xml
├── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── leixiaoshuai
│ │ └── easyrpc
│ │ ├── annotation
│ │ │ ├── ServiceExpose.java
│ │ │ └── ServiceReference.java
│ │ ├── common
│ │ │ └── ServiceInterfaceInfo.java
│ │ ├── listener
│ │ │ └── DefaultRpcListener.java
│ │ └── server
│ │ ├── network
│ │ │ ├── NettyRpcServer.java
│ │ │ └── RpcServer.java
│ │ └── registry
│ │ ├── NacosServiceRegistry.java
│ │ ├── ServiceRegistry.java
│ │ └── ZookeeperServiceRegistry.java
│ └── resources
└── target
小结
本部分我们完成了RPC框架服务端网络通信层的骨架搭建。首先抽象出 RpcServer 接口,定义了启动与停止的基本契约。随后,我们选择了高性能的 Netty 网络库进行具体实现。
NettyRpcServer 启动的核心在于初始化 boss 和 worker 两个线程组,分工明确,分别负责连接监听与请求处理。当客户端请求抵达,会触发自定义处理器 ChannelRequestHandler 的 channelRead 方法,在此我们将网络数据流转交给业务层的 RequestHandler 进行后续处理。
至此,一个能够监听端口、接收原始字节流请求的Spring监听器已经就位。下一篇文章,我们将深入 RequestHandler 的内部,揭秘如何将收到的字节流解码为具体的方法调用请求,并通过反射执行本地服务,最终将结果编码返回。欢迎持续关注,也欢迎在 云栈社区 与我们交流你的想法和疑问。