找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4141

积分

0

好友

544

主题
发表于 3 天前 | 查看: 15| 回复: 0

在自定义监听器完成服务接口注册后,构建RPC框架的下一步,便是启动网络通信服务器。只有服务器成功监听指定端口,客户端才能与服务端建立连接并进行通信。

项目《进阶实战:从零开始撸一个 RPC 框架》开源地址与系列导航:

https://github.com/CoderLeixiaoshuai/easy-rpc

本篇文章是该系列实战篇的延续,将重点讲解服务端网络通信容器的实现。

启动 RPC 网络通信服务容器

服务注册成功意味着服务已对外暴露。为了让客户端能够发起调用,注册完成后必须立即启动网络服务器并开始监听端口。

为了聚焦核心逻辑,我们只关注关键的启动步骤,具体代码如下:

// DefaultRpcListener.java

private void initRpcServer(ApplicationContext applicationContext) {
    // 1.1 扫描服务端@ServiceExpose注解,并将服务接口信息注册到注册中心
    // ……省略已实现的代码

    // 1.2 启动网络通信服务器,开始监听指定端口
    genericRpcServer.start();
}

DefaultRpcListener.java 中,通过依赖注入的 RpcService 调用其 start() 方法来启动服务器。下面我们来深入剖析这个方法的具体实现。

定义 RPC 服务容器接口

一个基础的 RPC 网络通信服务器至少需要提供启动停止两种能力。为了代码的扩展性,我们首先定义一个清晰的接口。

public interface RpcServer {
    /**
     * 启动服务
     */
    void start();

    /**
     * 停止服务
     */
    void stop();
}

接口定义了 start()stop() 两个无参方法。启动服务是核心,我们将重点阐述;停止服务的实现相对简单。

使用 Netty 实现 RPC 服务容器

Netty 因其高性能和易用性,成为众多分布式框架默认的网络通信组件。我们的 EasyRPC 框架也将采用它来实现服务端。

首先,创建一个 NettyRpcServer 类来实现 RpcServer 接口:

public class NettyRpcServer implements RpcServer {
    private int port;
    private RequestHandler requestHandler;

    public NettyRpcServer(int port, RequestHandler requestHandler) {
        this.port = port;
        this.requestHandler = requestHandler;
    }

    @Override
    public void start() {
        // TODO
    }

    @Override
    public void stop() {
        // TODO
    }
}

这里定义了两个关键成员变量:

  • port: Netty 服务器监听的端口号,客户端通过此端口连接服务端。
  • requestHandler: 请求处理器,负责处理接收到的客户端请求,包括解码、反射调用本地方法、编码响应等核心逻辑。

接下来,我们实现最重要的 start() 方法:

// NettyRpcServer.java

@Override
public void start() {
    // 创建两个线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      // 创建服务端的启动对象
      ServerBootstrap serverBootstrap = new ServerBootstrap()
        // 设置两个线程组
        .group(bossGroup, workerGroup)
        // 设置服务端通道实现类型
        .channel(NioServerSocketChannel.class)
        // 服务端用于接收进来的连接,也就是boosGroup线程,线程队列大小
        .option(ChannelOption.SO_BACKLOG, 100)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        // child 通道,worker 线程处理器
        .childHandler(new ChannelInitializer<SocketChannel>() {
          // 给 pipeline 管道设置自定义的处理器
          @Override
          public void initChannel(SocketChannel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new ChannelRequestHandler());
          }
        });

      // 绑定端口号,同步启动服务
      ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
      logger.info(“[easy-rpc]Rpc Server started on port: {}”, port);
      channel = channelFuture.channel();
      // 对关闭通道进行监听
      channelFuture.channel().closeFuture().sync();
    } catch (Exception e) {
      logger.error(“server error.”, e);
    } finally {
      // 释放线程组资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
}

这段代码是 Netty 服务端的标准启动流程。其核心在于初始化了两个线程组:bossGroup 负责接收客户端连接,workerGroup 负责处理具体的I/O操作和业务逻辑,这也是它们名称的由来。

workerGroup 的管道(pipeline)中,我们添加了一个自定义的处理器 ChannelRequestHandler 来处理入站请求。下面来实现这个处理器:

// NettyRpcServer.java  内部类

private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info(“Server receive a massage: {}”, msg);
        final ByteBuf msgBuf = (ByteBuf) msg;
        final byte[] reqBytes = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(reqBytes);
        // 调用请求处理器开始处理客户端请求
        final byte[] respBytes = requestHandler.handleRequest(reqBytes);
        logger.info(“Send response massage: {}”, respBytes);
        final ByteBuf resBuf = Unpooled.buffer(respBytes.length);
        resBuf.writeBytes(respBytes);
        ctx.writeAndFlush(resBuf);
    }
  // ……省略不重要的代码
}

ChannelRequestHandler 继承自 ChannelInboundHandlerAdapter(适配器模式的典型应用)。当有客户端数据到达时,会自动触发 channelRead 方法。

在该方法中,我们将接收到的 Netty ByteBuf 类型消息转换为 byte[] 字节数组,然后交由前面定义的 requestHandler 对象进行真正的业务处理(包括反序列化、服务端注解驱动程序调用、序列化响应等)。处理完成后,再将返回的 byte[] 数据包装成 ByteBuf 写回通道,从而完成一次完整的请求-响应通信。

注意:ByteBuf 是 Netty 提供的高效字节容器,并非 JDK 自带类。关于 requestHandler.handleRequest(reqBytes) 内部具体的消息处理流程,我们将在下一篇文章中详细展开。

代码结构

本节新增了两个核心类 RpcServer(接口)和 NettyRpcServer(实现),并将其组织在 server.network 包下。项目整体结构如下:

├── easy-rpc-spring-boot-starter
    ├── pom.xml
    ├── src
    │   └── main
    │       ├── java
    │       │   └── com
    │       │       └── leixiaoshuai
    │       │           └── easyrpc
    │       │               ├── annotation
    │       │               │   ├── ServiceExpose.java
    │       │               │   └── ServiceReference.java
    │       │               ├── common
    │       │               │   └── ServiceInterfaceInfo.java
    │       │               ├── listener
    │       │               │   └── DefaultRpcListener.java
    │       │               └── server
    │       │                   ├── network
    │       │                   │   ├── NettyRpcServer.java
    │       │                   │   └── RpcServer.java
    │       │                   └── registry
    │       │                       ├── NacosServiceRegistry.java
    │       │                       ├── ServiceRegistry.java
    │       │                       └── ZookeeperServiceRegistry.java
    │       └── resources
    └── target

小结

本部分我们完成了RPC框架服务端网络通信层的骨架搭建。首先抽象出 RpcServer 接口,定义了启动与停止的基本契约。随后,我们选择了高性能的 Netty 网络库进行具体实现。

NettyRpcServer 启动的核心在于初始化 bossworker 两个线程组,分工明确,分别负责连接监听与请求处理。当客户端请求抵达,会触发自定义处理器 ChannelRequestHandlerchannelRead 方法,在此我们将网络数据流转交给业务层的 RequestHandler 进行后续处理。

至此,一个能够监听端口、接收原始字节流请求的Spring监听器已经就位。下一篇文章,我们将深入 RequestHandler 的内部,揭秘如何将收到的字节流解码为具体的方法调用请求,并通过反射执行本地服务,最终将结果编码返回。欢迎持续关注,也欢迎在 云栈社区 与我们交流你的想法和疑问。




上一篇:RPC框架实战:服务端如何解码消息并利用反射调用本地方法
下一篇:FinClip:将小程序容器化,打造企业自有应用生态的技术实践
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 11:34 , Processed in 0.614576 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表