找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1029

积分

0

好友

140

主题
发表于 前天 08:23 | 查看: 16| 回复: 0

引子

Netty作为异步事件驱动的网络框架,凭借出色的性能和优雅的设计,已成为高并发网络通信领域的标杆。它不仅极大地简化了Java NIO编程的复杂性,还能轻松支持数万级别的并发连接,被DubboElasticsearch等众多知名开源项目选为核心通信组件。本文将深入探索如何结合Spring Boot 3Netty,从零开始构建一个高性能的即时通讯系统。

前置科普

在深入Netty的具体实现之前,我们有必要系统地回顾一下IO模型的基础知识。这有助于理解Netty为何如此高效。已经熟悉这些概念的读者可以直接跳至下一章节。

IO模型的基础概念

阻塞与非阻塞

这描述的是线程在访问资源时,针对资源是否准备就绪的一种处理态度:

  • 阻塞:线程发起资源请求后,如果资源尚未就绪,线程会一直等待,在此期间无法执行其他任何任务。
  • 非阻塞:线程发起资源请求后,无论资源是否就绪,都会立即得到一个结果(成功或失败),线程不会被挂起,可以继续执行后续任务。
同步与异步

这描述的是访问数据结果的一种机制:

  • 同步:线程需要主动等待并“盯着”资源就绪,然后由自身去获取结果。
  • 异步:线程发起请求后便不再关心,当资源就绪时,会通过回调(Callback)或事件通知等机制主动告知线程结果。
小结
  • 阻塞/非阻塞:关注的是等待结果期间,线程能不能干别的活
  • 同步/异步:关注的是谁负责主动获取结果——是自己不断轮询,还是坐等通知。

Java中的三种IO模型

BIO (Blocking I/O)

传统的同步阻塞IO模型,其工作方式非常直观。

  • 工作流程:线程发起IO请求 → 内核执行IO操作 → 线程被阻塞等待 → IO操作完成 → 线程恢复执行。
  • 资源消耗:每个连接都需要一个独立的线程来处理。当并发连接数上升时,线程数量线性增长,系统资源消耗巨大,上下文切换开销成为瓶颈。
  • 适用场景:连接数固定且较少、请求处理逻辑简单的应用。
  • 优缺点:编程模型简单易懂,但无法应对高并发场景。

1.png

NIO (Non-blocking I/O)

JDK 1.4引入的同步非阻塞IO模型,其核心是Selector(选择器)。

  • 工作流程:一个Selector线程可以轮询注册在其上的多个Channel(通道)。当某个Channel有IO事件(如连接接入、数据可读)发生时,Selector会通知应用程序进行处理。
  • 资源效率:单线程或少量线程即可管理成千上万的连接,显著提升了系统的并发处理能力和资源利用率。
  • 适用场景:高并发、低延迟的网络应用,如即时通讯、游戏服务器等。

2.png

AIO (Asynchronous I/O)

JDK 7引入的异步非阻塞IO模型,理念先进。

  • 工作流程:应用程序发起IO请求后立即返回,内核负责完成整个IO操作(如将数据从磁盘读到缓冲区),完成后通过回调函数通知应用程序。
  • 特点:真正的“发射后不管”,由操作系统驱动,应用程序无需主动轮询。
  • 局限性:在Linux系统上,其底层实现仍基于epoll,并未实现真正的异步IO,且API复杂,社区采用度不如NIO。
  • 适用场景:对吞吐量有极高要求的应用,在Windows平台的IOCP模型上表现更佳。

3.png

形象比喻
  • BIO: 你去餐厅点餐,点完后必须坐在位子上干等,直到服务员把菜端上来。每个客人都需要一个专属服务员全程服务。
  • NIO: 你去餐厅点餐,拿到一个取餐号后就可以去逛商场。但你需要时不时自己回来看大屏幕,检查你的号码是否被叫到。一个服务员可以同时服务多个客人。
  • AIO: 你去餐厅点餐,点完后留下手机号就可以离开。餐做好后,餐厅会主动打电话通知你来取。你完全不需要操心进度。

初识Netty

Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能客户端和服务器。它成功解决了原生Java NIO API复杂难用、网络异常处理繁琐以及某些平台NIO实现存在Bug等问题。

NettyNIO的基础上进行了深度优化和封装,既保留了NIO的高并发、非阻塞特性,又提供了更友好、更强大的API。它通过零拷贝技术提升网络传输效率,并内置了对多种协议(如HTTP、WebSocket、Protobuf等)的支持,具备极强的可扩展性。

:零拷贝技术旨在减少数据在内存中的拷贝次数。普通文件IO需要经历“磁盘→内核缓冲区→用户缓冲区→Socket缓冲区→网卡”多次拷贝。而零拷贝(如sendfile)允许数据直接从磁盘文件DMA拷贝到网卡缓冲区,跳过了用户空间的复制,极大提升了大文件传输性能。

线程模型

Netty提供了灵活可配的线程模型,开发者可根据应用场景选择:

单线程模型

所有IO操作(连接建立、读写、业务处理)均由同一个NioEventLoop线程完成。

  • 特点:结构简单,没有线程切换开销。
  • 适用场景:连接数少、业务处理非常轻量且快速。

4.png

多线程模型

由一个NioEventLoop线程池处理所有的IO操作。

  • 特点:充分利用多核CPU,提升了整体吞吐量。一个连接在同一时刻只被一个线程处理,避免了并发问题。
  • 适用场景:业务处理有一定耗时,连接数较多的场景。

5.png

主从多线程模型

最常用、性能最好的模型。职责分离:

  • Boss Group: 专门负责接收客户端的连接请求。
  • Worker Group: 负责处理已建立连接的IO读写和业务逻辑。
  • 特点:连接接收和业务处理分离,互不干扰,能更好地应对高并发连接。

6.png

Netty的核心机制之一是“管道”(Pipeline)处理模型。可以将其想象成一条流水线:每个网络连接(Channel)对应一条独立的流水线,数据包作为“物料”从一端进入,依次经过多个“处理器”(Handler)进行加工(如解码、验证、业务逻辑),最后从另一端输出。每个处理器只专注于自己的职责,这种设计使得代码模块化程度高、易于维护和复用,是构建复杂网络应用的理想模式。对于希望深入掌握现代服务端开发的开发者,理解这种基于事件的网络/系统架构至关重要。

生命周期

理解Netty组件的生命周期,有助于我们更好地控制程序行为并进行资源管理。

Channel的生命周期

Channel代表一个网络连接,其生命周期状态包括:

  • 注册(Registered): Channel被成功注册到EventLoop上。
  • 激活(Active): Channel连接建立完成,可以进行读写操作。
  • 非激活(Inactive): Channel连接被关闭。
  • 注销(Unregistered): Channel从EventLoop中注销。

这些状态变化会触发绑定在Channel上的ChannelHandler中的对应方法(如channelActive()channelInactive())。

Handler的生命周期

Handler是处理数据的单元,其生命周期方法包括:

  • handlerAdded(): 当Handler被添加到ChannelPipeline时调用。
  • handlerRemoved(): 当Handler从ChannelPipeline中移除时调用。
  • exceptionCaught(): 在处理过程中发生异常时调用。
服务器启动典型流程
  1. 创建引导类: 实例化ServerBootstrap
  2. 配置线程组: 设置bossGroupworkerGroup
  3. 指定Channel类型: 例如NioServerSocketChannel.class
  4. 设置处理器链: 通过ChannelInitializer配置每个新连接的Pipeline
  5. 绑定端口: 调用bind(port)方法启动服务并开始监听。

这个过程如同组装一台机器:准备零件(创建组件)、按图组装(配置连接)、通电启动(绑定端口)、机器运转(处理请求)、最终关机(释放资源)。

实时通讯技术方案选型

在构建需要实时数据交互的应用时,通常有以下几种技术方案:

Ajax轮询 (Polling)
  • 原理: 客户端以固定时间间隔(如每秒)向服务器发送HTTP请求,询问是否有新数据。
  • 优势: 实现极其简单,兼容性最好。
  • 劣势: 大量无效请求造成带宽和服务器资源浪费,实时性差(延迟至少为一个轮询间隔)。
长轮询 (Long Polling)
  • 原理: 客户端发送请求后,服务器持有连接不立即返回,直到有新数据或超时。客户端收到响应后立即发起下一个请求。
  • 优势: 减少了无效请求,实时性比短轮询好。
  • 劣势: 服务器需要维持大量并发连接,高并发下资源消耗依然很大。
WebSocket
  • 原理: 在单个TCP连接上提供全双工、双向通信通道。连接建立后,客户端和服务器可以随时主动向对方发送数据。
  • 优势: 真正的低延迟实时通信,协议头开销小,非常适合频繁交互的场景。
  • 劣势: 实现相对复杂,需要考虑连接保持、重连等,部分老旧浏览器不支持。

对于我们要构建的即时通讯服务WebSocket无疑是现阶段的最佳选择。幸运的是,Netty提供了对WebSocket协议的原生、高性能支持,让我们可以聚焦于业务逻辑,而无需深入处理底层的协议解析和通信细节。

代码实现

本节将展示基于Netty开发即时通讯服务的核心代码,分为前端交互和后端服务两部分。

前端核心通信逻辑

本文重点在于后端服务构建,因此前端仅展示与Netty服务器通信的核心代码,涵盖连接建立、消息收发与状态管理。

// 1. 全局配置
globalData: {
    chatServerUrl: "ws://127.0.0.1:875/ws", // WebSocket服务器地址
    CHAT: null, // WebSocket连接实例
    chatSocketOpen: false, // 连接状态标志
},

// 2. 应用启动时初始化连接
onLaunch: function() {
    this.doConnect(false);
},

// 3. 核心连接方法
doConnect(isFirst) {
    if (isFirst) {
        uni.showToast({ icon: "loading", title: "断线重连中...", duration: 2000 });
    }
    var me = this;
    // 仅当用户已登录时建立连接
    if (me.getUserInfoSession()) {
        me.globalData.CHAT = uni.connectSocket({ url: me.globalData.chatServerUrl });

        // 4. 连接成功事件
        me.globalData.CHAT.onOpen(function() {
            me.globalData.chatSocketOpen = true;
            console.log("WebSocket连接已打开");
            // 发送初始化消息,告知服务器用户身份 (msgType: 0)
            var initMsg = JSON.stringify({
                chatMsg: { senderId: me.getUserInfoSession().id, msgType: 0 }
            });
            me.globalData.CHAT.send({ data: initMsg });
        });

        // 5. 连接关闭事件
        me.globalData.CHAT.onClose(function() {
            me.globalData.chatSocketOpen = false;
            console.log("WebSocket连接已关闭");
        });

        // 6. 接收消息事件
        me.globalData.CHAT.onMessage(function(res) {
            console.log('收到服务器消息:', res.data);
            me.dealReceiveLastestMsg(JSON.parse(res.data)); // 处理消息
        });

        // 7. 连接错误事件
        me.globalData.CHAT.onError(function() {
            me.globalData.chatSocketOpen = false;
            console.log('WebSocket连接失败');
        });
    }
},

// 8. 通用发送消息方法
sendSocketMessage(msg) {
    if (this.globalData.chatSocketOpen) {
        uni.sendSocketMessage({ data: msg });
    } else {
        uni.showToast({ icon: "none", title: "连接已断开" });
    }
},
// ... (dealReceiveLastestMsg, saveLastestMsgToLocal 等消息处理方法)
后端服务搭建
第一步:添加依赖

pom.xml中引入Netty依赖(请使用Maven中央仓库的最新稳定版本)。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.108.Final</version> <!-- 示例版本,请查阅最新 -->
</dependency>
第二步:创建服务器启动类 (ChatServer.java)

这是Netty服务的入口,负责配置和启动服务器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ChatServer {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接收连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO

        try {
            // 2. 创建服务器引导
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class) // 使用NIO传输
                  .childHandler(new WSServerInitializer()); // 指定连接处理器

            // 3. 绑定端口,启动服务
            ChannelFuture channelFuture = server.bind(875).sync();
            System.out.println("Netty WebSocket 服务器启动在端口: 875");
            channelFuture.channel().closeFuture().sync(); // 等待服务端Socket关闭
        } finally {
            // 4. 优雅关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
第三步:创建通道初始化器 (WSServerInitializer.java)

为每个新连接配置处理流水线(Pipeline)。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WSServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 处理HTTP请求的编解码
        pipeline.addLast(new HttpServerCodec());
        // 支持大数据流写入
        pipeline.addLast(new ChunkedWriteHandler());
        // 将HTTP消息的多个部分聚合为一个完整的FullHttpRequest/Response
        pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 64KB
        // 处理WebSocket握手和协议帧
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 添加自定义的业务处理器
        pipeline.addLast(new ChatHandler());
    }
}
第四步:创建会话管理器 (UserChannelSession.java)

管理用户与Channel的映射关系,支持同一用户多端登录。

import io.netty.channel.Channel;
import java.util.*;
public class UserChannelSession {
    private static Map<String, List<Channel>> userChannelMap = new HashMap<>(); // userId -> Channels
    private static Map<String, String> channelUserMap = new HashMap<>(); // channelId -> userId

    public static void bind(String userId, Channel channel) {
        String channelId = channel.id().asLongText();
        channelUserMap.put(channelId, userId);
        userChannelMap.computeIfAbsent(userId, k -> new ArrayList<>()).add(channel);
    }

    public static void unbind(Channel channel) {
        String channelId = channel.id().asLongText();
        String userId = channelUserMap.remove(channelId);
        if (userId != null) {
            List<Channel> channels = userChannelMap.get(userId);
            if (channels != null) {
                channels.remove(channel);
                if (channels.isEmpty()) {
                    userChannelMap.remove(userId);
                }
            }
        }
    }

    public static List<Channel> getChannelsByUserId(String userId) {
        return userChannelMap.getOrDefault(userId, Collections.emptyList());
    }
}
第五步:创建核心消息处理器 (ChatHandler.java)

处理WebSocket消息,包括连接初始化、消息转发等核心逻辑。这里为消息类型(msgType)预留了扩展接口,本次仅实现文字聊天。

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import com.alibaba.fastjson.JSON;
import java.time.LocalDateTime;

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    // 用于记录和管理所有活跃的客户端Channel
    private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String json = msg.text();
        DataContent dataContent = JSON.parseObject(json, DataContent.class);
        ChatMsg chatMsg = dataContent.getChatMsg();
        Integer msgType = chatMsg.getMsgType();
        String senderId = chatMsg.getSenderId();
        Channel currentChannel = ctx.channel();

        if (msgType == 0) { // 连接初始化消息
            UserChannelSession.bind(senderId, currentChannel);
            System.out.println("用户 " + senderId + " 已连接, Channel ID: " + currentChannel.id());
        } else if (msgType == 1) { // 文字聊天消息
            chatMsg.setChatTime(LocalDateTime.now());
            String receiverId = chatMsg.getReceiverId();
            // 查找接收者的所有在线Channel
            List<Channel> receiverChannels = UserChannelSession.getChannelsByUserId(receiverId);
            if (receiverChannels.isEmpty()) {
                chatMsg.setIsReceiverOnLine(false);
                // TODO: 可在此处存储离线消息
            } else {
                chatMsg.setIsReceiverOnLine(true);
                dataContent.setChatMsg(chatMsg);
                String responseJson = JSON.toJSONString(dataContent);
                // 向接收者的所有在线终端转发消息
                for (Channel rc : receiverChannels) {
                    rc.writeAndFlush(new TextWebSocketFrame(responseJson));
                }
            }
            // 也可选择将消息回发给发送者一份作为发送成功回执
            // currentChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dataContent)));
        }
        // 可扩展其他消息类型,如图片(msgType=2)、语音(3)等
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        clients.remove(channel);
        UserChannelSession.unbind(channel); // 清理会话映射
        System.out.println("客户端断开连接: " + channel.id());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close(); // 发生异常时关闭连接
    }
}

// 辅助POJO类
class DataContent {
    private ChatMsg chatMsg;
    private String chatTime;
    // getter/setter
}
class ChatMsg {
    private String senderId;
    private String receiverId;
    private String msg;
    private Integer msgType;
    private LocalDateTime chatTime;
    private Boolean isReceiverOnLine;
    // getter/setter
}

完整流程梳理:

  1. 启动ChatServer.main()启动,创建线程组并绑定875端口。
  2. 初始化:新客户端连接时,WSServerInitializer为其Channel配置好HTTP和WebSocket处理器链。
  3. 连接建立:连接成功后,ChatHandler.handlerAdded()将该Channel加入全局clients组。
  4. 身份绑定:客户端发送msgType=0的初始化消息,服务器在UserChannelSession中记录userIdChannel的映射。
  5. 消息转发:客户端A发送聊天消息(msgType=1)给B,服务器查找B的所有在线Channel并进行转发。
  6. 连接断开:客户端断开或异常时,handlerRemoved()exceptionCaught()负责清理该Channel及相关映射。
效果演示

7.gif

小结

至此,我们成功构建了一个基于Spring Boot 3(可通过简单整合)与Netty的高性能WebSocket即时通讯服务核心框架。尽管当前实现侧重于展示核心流程,在消息持久化、离线推送、群聊、安全认证等方面仍有完善空间,但它清晰地演示了利用Netty处理高并发实时通信的完整架构。

基于此框架,可以方便地进行功能扩展,例如:

  • 集成Redis发布订阅或Kafka实现消息队列,处理异步任务与离线消息。
  • 集成MySQLMongoDB实现消息的持久化存储与历史查询。
  • 扩展ChatHandler,支持图片、文件、语音等多媒体消息类型。
  • 添加基于Token的连接认证机制。

希望本文提供的思路和代码示例,能够帮助你理解Netty的核心原理,并成为你构建更强大、更完备的实时通信系统的坚实基础。




上一篇:UNet图像分割网络详解:从基础架构到PyTorch实战与医学影像应用
下一篇:DBA面试必备:五款主流数据库(Oracle/MySQL/PostgreSQL/SQL Server/DB2)核心架构图解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 14:39 , Processed in 0.112173 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表