找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2601

积分

0

好友

361

主题
发表于 4 天前 | 查看: 11| 回复: 0

当 HTTP 协议的无状态特性遇到实时数据推送的迫切需求时,WebSocket 技术便成为构建现代 Java 全双工通信应用的核心桥梁。如今,从在线聊天室、实时股票行情到协同编辑和互动游戏,实时通信功能已成为互联网应用的标配。

本文将深入探讨在 Spring Boot 框架中实现 WebSocket 的全过程,内容覆盖从基础配置、连接管理到高并发场景下的性能优化,旨在帮助你构建一个健壮、可扩展的实时通信系统。

WebSocket 协议基础与优势

WebSocket 是一种在单个 TCP 连接 上提供全双工通信的协议。相较于传统的 HTTP 轮询方案,它在实时性、通信效率和服务器资源利用率方面具有显著优势。

HTTP轮询 vs WebSocket 连接

使用 HTTP 实现实时通信的传统方法包括短轮询、长轮询和 Server-Sent Events (SSE),但它们都存在各自的局限性:

  • 短轮询:客户端定期向服务器发送请求,产生大量无效请求,造成带宽浪费。
  • 长轮询:服务器保持请求直到有数据或超时,导致连接频繁重建,消耗资源。
  • SSE:仅支持服务器向客户端的单向推送,无法实现真正的双向通信。

WebSocket 协议通过在 HTTP 握手成功后建立的持久连接上交换数据帧,完美解决了上述问题,实现了低延迟的双向实时通信。

// WebSocket通信流程示例

// 1. HTTP握手请求
GET /websocket-endpoint HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

// 2. 服务器响应握手成功
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

一旦 WebSocket 连接建立,通信双方可以随时发送消息,无需等待传统的请求-响应循环,消息延迟可降低至毫秒级。

Spring Boot WebSocket 完整配置

Spring Framework 为 WebSocket 提供了全面的支持,涵盖了从协议处理到消息转换的完整开发生态。

核心依赖配置

首先,确保你的项目中引入了必要的依赖。

<!-- Maven配置 -->
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- 可选:STOMP协议支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-messaging</artifactId>
    </dependency>
</dependencies>

WebSocket 配置类实现

创建一个配置类来启用 WebSocket 支持并定义端点。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler(), "/ws/message")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOriginPatterns("*"); // 生产环境应严格限制来源

        // SockJS后备选项支持,用于兼容不支持WebSocket的浏览器
        registry.addHandler(myWebSocketHandler(), "/ws/sockjs/message")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS()
                .setHeartbeatTime(25000); // 25秒心跳
    }

    @Bean
    public WebSocketHandler myWebSocketHandler() {
        return new MyWebSocketHandler();
    }

    // 配置消息统计
    @Bean
    public WebSocketMessageBrokerStats webSocketMessageBrokerStats() {
        WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats();
        stats.setLoggingPeriod(30 * 1000); // 每30秒记录一次统计信息
        return stats;
    }
}

消息缓冲区优化配置

对于高并发场景,合理配置消息缓冲区的大小至关重要,这能有效避免因缓冲区不足导致的连接异常或性能瓶颈。

# application.yml配置
spring:
  websocket:
    # 配置发送缓冲区大小
    send-buffer-size: 512KB
    # 配置接收缓冲区大小
    receive-buffer-size: 512KB
    # 配置单个消息大小限制
    message-size-limit: 128KB
    # 配置消息发送超时时间(毫秒)
    send-time-limit: 10000

消息处理器与连接管理

消息处理器是 WebSocket 应用的核心组件,负责管理连接的生命周期(建立、通信、关闭)和处理消息的流转逻辑。

自定义 WebSocket 处理器

下面是一个功能完整的自定义处理器示例,实现了连接管理、消息分发和群组功能。

@Component
public class MyWebSocketHandler extends TextWebSocketHandler {

    // 使用线程安全的ConcurrentHashMap管理所有活跃连接
    private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    // 按群组管理连接,提高群发消息时的查找效率
    private static final ConcurrentHashMap<String, Set<String>> groupSessions = new ConcurrentHashMap<>();

    private static final Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String sessionId = session.getId();
        String userId = extractUserId(session);

        sessions.put(sessionId, session);
        // 将用户加入默认群组
        addUserToGroup("default", sessionId);

        logger.info("WebSocket连接建立: sessionId={}, userId={}, 当前连接数={}",
                   sessionId, userId, sessions.size());

        // 发送欢迎消息
        sendMessageToSession(session, new TextMessage("{\"type\":\"welcome\",\"message\":\"连接成功\"}"));
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        logger.debug("收到消息: sessionId={}, payload={}", session.getId(), payload);

        try {
            // 解析JSON格式的消息体
            JsonNode jsonNode = objectMapper.readTree(payload);
            String messageType = jsonNode.get("type").asText();

            switch (messageType) {
                case "chat":
                    handleChatMessage(session, jsonNode);
                    break;
                case "heartbeat":
                    handleHeartbeat(session, jsonNode);
                    break;
                case "join_group":
                    handleJoinGroup(session, jsonNode);
                    break;
                case "leave_group":
                    handleLeaveGroup(session, jsonNode);
                    break;
                default:
                    logger.warn("未知的消息类型: {}", messageType);
            }
        } catch (Exception e) {
            logger.error("消息处理异常", e);
            sendErrorMessage(session, "消息格式错误");
        }
    }

    private void handleChatMessage(WebSocketSession session, JsonNode jsonNode) throws IOException {
        String targetType = jsonNode.get("targetType").asText();
        String content = jsonNode.get("content").asText();
        String fromUser = extractUserId(session);

        if ("broadcast".equals(targetType)) {
            // 广播消息给所有连接
            broadcastMessage(content, fromUser);
        } else if ("group".equals(targetType)) {
            String groupId = jsonNode.get("groupId").asText();
            // 发送消息到指定群组
            sendMessageToGroup(groupId, content, fromUser);
        } else if ("private".equals(targetType)) {
            String toUserId = jsonNode.get("toUserId").asText();
            // 发送点对点私聊消息
            sendPrivateMessage(toUserId, content, fromUser);
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String sessionId = session.getId();
        sessions.remove(sessionId);

        // 从所有群组中移除此会话
        groupSessions.values().forEach(group -> group.remove(sessionId));

        logger.info("WebSocket连接关闭: sessionId={}, 状态码={}, 原因={}, 当前连接数={}",
                   sessionId, status.getCode(), status.getReason(), sessions.size());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        logger.error("WebSocket传输错误: sessionId=" + session.getId(), exception);
        session.close(CloseStatus.SERVER_ERROR);
    }

    // 广播消息给所有连接
    public void broadcastMessage(String content, String fromUser) {
        String message = createMessageJson("chat", content, fromUser, "broadcast", null);
        TextMessage textMessage = new TextMessage(message);

        sessions.forEach((sessionId, session) -> {
            if (session.isOpen()) {
                try {
                    session.sendMessage(textMessage);
                } catch (IOException e) {
                    logger.error("发送广播消息失败: sessionId=" + sessionId, e);
                }
            }
        });
    }

    // 发送消息到指定群组
    public void sendMessageToGroup(String groupId, String content, String fromUser) {
        Set<String> sessionIds = groupSessions.getOrDefault(groupId, Collections.emptySet());
        String message = createMessageJson("chat", content, fromUser, "group", groupId);
        TextMessage textMessage = new TextMessage(message);

        sessionIds.forEach(sessionId -> {
            WebSocketSession session = sessions.get(sessionId);
            if (session != null && session.isOpen()) {
                try {
                    session.sendMessage(textMessage);
                } catch (IOException e) {
                    logger.error("发送群组消息失败: sessionId=" + sessionId, e);
                }
            }
        });
    }

    // 工具方法:创建标准化的JSON消息
    private String createMessageJson(String type, String content, String fromUser,
                                     String targetType, String targetId) {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("type", type);
        jsonObject.addProperty("content", content);
        jsonObject.addProperty("fromUser", fromUser);
        jsonObject.addProperty("timestamp", System.currentTimeMillis());
        jsonObject.addProperty("targetType", targetType);

        if (targetId != null) {
            jsonObject.addProperty("targetId", targetId);
        }

        return jsonObject.toString();
    }

    // 从Session中提取用户身份标识
    private String extractUserId(WebSocketSession session) {
        Principal principal = session.getPrincipal();
        if (principal != null) {
            return principal.getName();
        }

        // 从HTTP Session属性中获取(通过握手拦截器设置)
        Map<String, Object> attributes = session.getAttributes();
        if (attributes.containsKey("userId")) {
            return (String) attributes.get("userId");
        }

        // 默认返回匿名用户标识
        return "anonymous_" + session.getId().substring(0, 8);
    }

    // 将用户会话添加到指定群组
    private void addUserToGroup(String groupId, String sessionId) {
        groupSessions.computeIfAbsent(groupId, k -> ConcurrentHashMap.newKeySet())
                     .add(sessionId);
    }
}

WebSocket 连接认证与消息处理流程图
图:WebSocket 通信系统的核心流程,展示了从握手认证到消息路由的完整路径

心跳机制与连接健康检查

在长连接场景中,心跳机制是维持连接活性、及时发现并清理“僵尸连接”(已失效但未正常关闭的连接)的关键手段,能有效释放服务器资源。

心跳处理器实现

@Component
public class HeartbeatHandler {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    // 心跳超时时间(毫秒),例如30秒无心跳视为连接失效
    private static final long HEARTBEAT_TIMEOUT = 30000;

    // 记录每个会话的最后一次心跳时间
    private final ConcurrentHashMap<String, Long> lastHeartbeatTimes = new ConcurrentHashMap<>();

    @Scheduled(fixedRate = 10000) // 每10秒执行一次健康检查
    public void checkHeartbeats() {
        long currentTime = System.currentTimeMillis();

        lastHeartbeatTimes.entrySet().removeIf(entry -> {
            String sessionId = entry.getKey();
            long lastTime = entry.getValue();

            // 如果超过超时时间未收到心跳,则认为连接已失效
            if (currentTime - lastTime > HEARTBEAT_TIMEOUT) {
                logger.warn("心跳超时,关闭连接: sessionId={}", sessionId);
                // 在实际应用中,这里应关闭对应的WebSocket连接
                return true;
            }
            return false;
        });
    }

    public void updateHeartbeat(String sessionId) {
        lastHeartbeatTimes.put(sessionId, System.currentTimeMillis());
    }

    // 心跳消息处理器
    public void handleHeartbeatMessage(WebSocketSession session, JsonNode jsonNode) {
        String sessionId = session.getId();
        updateHeartbeat(sessionId);

        // 回复心跳响应,确认连接正常
        JsonObject response = new JsonObject();
        response.addProperty("type", "heartbeat_response");
        response.addProperty("timestamp", System.currentTimeMillis());
        response.addProperty("serverTime", System.currentTimeMillis());

        try {
            session.sendMessage(new TextMessage(response.toString()));
        } catch (IOException e) {
            logger.error("发送心跳响应失败", e);
        }
    }
}

跨域处理与安全配置

在生产环境中部署 WebSocket 服务,必须妥善处理跨域请求和安全问题,防止未授权访问和恶意攻击。

跨域配置

在 Spring Security 配置中为 WebSocket 端点开放权限并设置合理的 CORS 策略。

@Configuration
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            // 配置WebSocket连接权限
            .authorizeRequests()
                .antMatchers("/ws/**").permitAll() // WebSocket端点允许所有连接(生产环境需细化)
                .anyRequest().authenticated()
                .and()
            // 禁用CSRF保护(WebSocket连接通常不需要)
            .csrf().disable()
            // 添加CORS配置
            .cors().configurationSource(corsConfigurationSource());
    }

    @Bean
    public CorsConfigurationSource corsConfigurationSource() {
        CorsConfiguration configuration = new CorsConfiguration();
        configuration.setAllowedOriginPatterns(Arrays.asList(
            "http://localhost:*",
            "https://*.example.com" // 替换为你的实际域名
        ));
        configuration.setAllowedMethods(Arrays.asList("GET", "POST", "OPTIONS"));
        configuration.setAllowedHeaders(Arrays.asList("*"));
        configuration.setAllowCredentials(true);
        configuration.setMaxAge(3600L);

        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", configuration);
        return source;
    }
}

WebSocket 连接认证与授权

在握手阶段进行身份验证,确保只有合法用户能够建立连接。

@Component
public class AuthHandshakeInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes)
            throws Exception {

        // 从HTTP请求中提取认证信息
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();

            // 检查URL参数或Header中的认证令牌
            String token = httpServletRequest.getParameter("token");
            if (token != null && validateToken(token)) {
                String userId = extractUserIdFromToken(token);
                attributes.put("userId", userId);
                attributes.put("authenticated", true);
            } else {
                // 未认证连接,可以设置有限权限或直接拒绝
                attributes.put("authenticated", false);
                attributes.put("userId", "anonymous");
            }
        }

        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    private boolean validateToken(String token) {
        // 实际应用中应实现完整的JWT或OAuth token验证逻辑
        return token != null && token.startsWith("valid_");
    }

    private String extractUserIdFromToken(String token) {
        // 从token中提取用户ID,此处为简单示例
        return token.substring(6);
    }
}

实战案例:在线聊天系统

下面我们构建一个功能完整的在线聊天系统,它包含用户管理、群组聊天和私聊等核心功能。

前端 WebSocket 客户端 (HTML/JavaScript)

这是一个具备自动重连和心跳功能的前端客户端实现。

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket聊天室</title>
    <script>
        class WebSocketChatClient {
            constructor() {
                this.socket = null;
                this.userId = 'user_' + Math.random().toString(36).substr(2, 9);
                this.currentGroup = 'general';
                this.reconnectAttempts = 0;
                this.maxReconnectAttempts = 5;
            }

            connect() {
                const wsUrl = `ws://${window.location.host}/ws/message?token=valid_${this.userId}`;
                this.socket = new WebSocket(wsUrl);

                this.socket.onopen = () => {
                    console.log('WebSocket连接已建立');
                    this.reconnectAttempts = 0;

                    // 加入默认群组
                    this.joinGroup(this.currentGroup);

                    // 启动定时心跳
                    this.startHeartbeat();
                };

                this.socket.onmessage = (event) => {
                    const message = JSON.parse(event.data);
                    this.handleMessage(message);
                };

                this.socket.onclose = (event) => {
                    console.log('WebSocket连接已关闭', event);
                    this.handleDisconnection();
                };

                this.socket.onerror = (error) => {
                    console.error('WebSocket错误', error);
                };
            }

            handleMessage(message) {
                switch (message.type) {
                    case 'chat':
                        this.displayChatMessage(message);
                        break;
                    case 'heartbeat_response':
                        console.log('收到心跳响应');
                        break;
                    case 'system':
                        this.displaySystemMessage(message);
                        break;
                    case 'user_list':
                        this.updateUserList(message.users);
                        break;
                }
            }

            sendChatMessage(content, targetType = 'group', targetId = this.currentGroup) {
                const message = {
                    type: 'chat',
                    content: content,
                    targetType: targetType,
                    targetId: targetId,
                    timestamp: Date.now()
                };

                this.socket.send(JSON.stringify(message));
            }

            joinGroup(groupId) {
                const message = {
                    type: 'join_group',
                    groupId: groupId,
                    timestamp: Date.now()
                };

                this.socket.send(JSON.stringify(message));
                this.currentGroup = groupId;
            }

            startHeartbeat() {
                // 每20秒发送一次心跳包
                this.heartbeatInterval = setInterval(() => {
                    if (this.socket.readyState === WebSocket.OPEN) {
                        const heartbeat = {
                            type: 'heartbeat',
                            timestamp: Date.now()
                        };
                        this.socket.send(JSON.stringify(heartbeat));
                    }
                }, 20000);
            }

            handleDisconnection() {
                clearInterval(this.heartbeatInterval);

                // 实现指数退避的重连机制
                if (this.reconnectAttempts < this.maxReconnectAttempts) {
                    this.reconnectAttempts++;
                    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

                    console.log(`${delay/1000}秒后尝试重连,第${this.reconnectAttempts}次`);

                    setTimeout(() => {
                        this.connect();
                    }, delay);
                }
            }

            displayChatMessage(message) {
                const chatArea = document.getElementById('chatArea');
                const messageElement = document.createElement('div');
                messageElement.className = 'message';
                messageElement.innerHTML = `
                    <strong>${message.fromUser}:</strong>
                    <span>${message.content}</span>
                    <small>${new Date(message.timestamp).toLocaleTimeString()}</small>
                `;

                if (message.fromUser === this.userId) {
                    messageElement.classList.add('own-message');
                }

                chatArea.appendChild(messageElement);
                chatArea.scrollTop = chatArea.scrollHeight;
            }
        }

        // 初始化聊天客户端
        document.addEventListener('DOMContentLoaded', () => {
            window.chatClient = new WebSocketChatClient();
            window.chatClient.connect();
        });
    </script>
</head>
<body>
    <div class="chat-container">
        <div class="sidebar">
            <h3>在线用户</h3>
            <div id="userList"></div>
            <h3>聊天群组</h3>
            <div id="groupList">
                <button onclick="chatClient.joinGroup('general')">General</button>
                <button onclick="chatClient.joinGroup('random')">Random</button>
            </div>
        </div>
        <div class="main">
            <div id="chatArea" class="chat-area"></div>
            <div class="input-area">
                <input type="text" id="messageInput" placeholder="输入消息...">
                <button onclick="sendMessage()">发送</button>
            </div>
        </div>
    </div>

    <script>
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();

            if (message) {
                window.chatClient.sendChatMessage(message);
                input.value = '';
            }
        }

        document.getElementById('messageInput').addEventListener('keypress', (e) => {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>

服务端群组管理增强

为了支持更复杂的群聊业务,我们在服务端实现一个集中的群组管理服务。

@Service
public class GroupChatService {

    private static final Logger logger = LoggerFactory.getLogger(GroupChatService.class);

    // 存储群组详细信息
    private final ConcurrentHashMap<String, GroupInfo> groups = new ConcurrentHashMap<>();

    // 记录用户加入了哪些群组
    private final ConcurrentHashMap<String, Set<String>> userGroups = new ConcurrentHashMap<>();

    @Autowired
    private MyWebSocketHandler webSocketHandler;

    /**
     * 创建新群组
     */
    public void createGroup(String groupId, String creatorId, String groupName) {
        GroupInfo groupInfo = new GroupInfo(groupId, groupName, creatorId);
        groups.put(groupId, groupInfo);

        logger.info("群组创建成功: groupId={}, groupName={}, creator={}",
                   groupId, groupName, creatorId);

        // 向所有在线用户广播系统消息
        broadcastSystemMessage(String.format("群组'%s'已创建", groupName));
    }

    /**
     * 用户加入群组
     */
    public void joinGroup(String groupId, String userId, String sessionId) {
        groups.computeIfPresent(groupId, (key, groupInfo) -> {
            groupInfo.addMember(userId);
            return groupInfo;
        });

        userGroups.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet())
                  .add(groupId);

        // 将用户的WebSocket会话加入群组映射
        webSocketHandler.addUserToGroup(groupId, sessionId);

        // 通知群组内其他成员
        sendGroupMessage(groupId,
                String.format("用户 %s 加入了群组", userId),
                "system");

        logger.info("用户加入群组: userId={}, groupId={}", userId, groupId);
    }

    /**
     * 发送群组消息
     */
    public void sendGroupMessage(String groupId, String content, String senderId) {
        GroupInfo groupInfo = groups.get(groupId);
        if (groupInfo == null) {
            throw new IllegalArgumentException("群组不存在: " + groupId);
        }

        // 安全检查:发送者必须是群组成员
        if (!groupInfo.getMembers().contains(senderId)) {
            throw new SecurityException("非群组成员不能发送消息");
        }

        // 委托给WebSocket处理器进行实际的消息发送
        webSocketHandler.sendMessageToGroup(groupId, content, senderId);

        logger.debug("群组消息: groupId={}, sender={}, content={}",
                    groupId, senderId, content);
    }

    /**
     * 获取群组成员列表
     */
    public List<String> getGroupMembers(String groupId) {
        GroupInfo groupInfo = groups.get(groupId);
        return groupInfo != null ? new ArrayList<>(groupInfo.getMembers()) :
               Collections.emptyList();
    }

    /**
     * 广播系统消息
     */
    private void broadcastSystemMessage(String content) {
        JsonObject message = new JsonObject();
        message.addProperty("type", "system");
        message.addProperty("content", content);
        message.addProperty("timestamp", System.currentTimeMillis());

        webSocketHandler.broadcastMessage(message.toString(), "system");
    }

    /**
     * 群组信息内部类
     */
    @Data
    @AllArgsConstructor
    private static class GroupInfo {
        private String groupId;
        private String groupName;
        private String creatorId;
        private Set<String> members = ConcurrentHashMap.newKeySet();
        private long createTime = System.currentTimeMillis();

        public GroupInfo(String groupId, String groupName, String creatorId) {
            this.groupId = groupId;
            this.groupName = groupName;
            this.creatorId = creatorId;
            this.members.add(creatorId);
        }

        public void addMember(String userId) {
            members.add(userId);
        }
    }
}

性能监控与优化策略

在高并发场景下,对 WebSocket 服务进行性能监控和针对性优化是保证系统稳定性的关键。

连接与性能指标监控

集成 Micrometer 等指标库,对关键指标进行收集和暴露。

@Component
public class WebSocketMetrics {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketMetrics.class);

    private final AtomicInteger connectionCount = new AtomicInteger(0);
    private final AtomicLong totalMessages = new AtomicLong(0);
    private final AtomicInteger activeGroups = new AtomicInteger(0);

    // 消息处理耗时分布统计
    private final DistributionSummary messageProcessingTime =
            DistributionSummary.builder("websocket.message.processing.time")
                    .baseUnit("milliseconds")
                    .register(Metrics.globalRegistry);

    // 连接建立耗时统计
    private final Timer connectionEstablishTimer =
            Timer.builder("websocket.connection.establish.time")
                    .register(Metrics.globalRegistry);

    @Scheduled(fixedRate = 60000) // 每分钟记录一次日志
    public void logMetrics() {
        logger.info("WebSocket指标统计 - 连接数: {}, 总消息数: {}, 活跃群组: {}, 平均处理时间: {}ms",
                   connectionCount.get(),
                   totalMessages.get(),
                   activeGroups.get(),
                   messageProcessingTime.mean());
    }

    public void incrementConnectionCount() {
        connectionCount.incrementAndGet();
    }

    public void decrementConnectionCount() {
        connectionCount.decrementAndGet();
    }

    public void incrementMessageCount() {
        totalMessages.incrementAndGet();
    }

    public void recordProcessingTime(long millis) {
        messageProcessingTime.record(millis);
    }

    public Timer.Sample startConnectionTimer() {
        return Timer.start();
    }

    public void stopConnectionTimer(Timer.Sample sample) {
        sample.stop(connectionEstablishTimer);
    }
}

高并发优化建议

  1. 连接数控制

    • 设置合理的最大连接数限制,防止资源耗尽。
    • 实现连接拒绝策略(如返回 503 状态码),保护服务端。
    • 使用连接池管理长时间空闲连接,定期清理。
  2. 内存优化

    • 监控 WebSocket Session 的内存占用,防止内存泄漏。
    • 及时清理断开会话的引用,帮助 GC 回收。
    • 优化消息对象的序列化/反序列化过程,选择高效的 JSON 库(如 Jackson)。
  3. 网络优化

    • 启用 TCP_NODELAY 选项,减少小数据包的延迟。
    • 根据负载测试结果,配置合理的发送和接收缓冲区大小
    • 对文本消息启用 GZIP 压缩,减少带宽占用。
  4. 集群部署

    • 使用 Redis Pub/Sub 或 Kafka 等消息中间件实现跨节点消息广播
    • 根据业务场景,实现会话共享(如将会话信息存入 Redis)或采用粘性会话策略。
    • 设计优雅的节点故障转移机制,确保用户体验不受影响。

总结

面对日益增长的实时通信需求,Spring Boot 与 WebSocket 的组合为开发者提供了一套高效、成熟的解决方案。从基础的协议握手到复杂的高并发场景,从简单的消息推送到功能完整的在线聊天系统,本文展示了构建稳健实时服务所需的核心技术与架构思考。

技术的真正价值往往在极端情况下得以体现:当连接数激增时,你的服务是能够优雅扩展,还是不堪重负?答案隐藏在每一个设计细节和优化策略之中。希望这篇指南能帮助你在 Spring Boot 上构建出既能满足功能需求,又能应对性能挑战的实时通信系统。欢迎在 云栈社区 交流更多关于 WebSocket 和高并发架构的实践经验。




上一篇:阿里云2核2G服务器真能跑Docker吗?实测Nginx、Python、Redis容器部署指南
下一篇:Spring事务应该放在Controller还是Service层?详解正确层次设计
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-24 04:03 , Processed in 0.275069 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表