找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1482

积分

0

好友

194

主题
发表于 昨天 02:05 | 查看: 2| 回复: 0

在开发工作中,我们经常接触到HTTP协议、TCP/IP协议、Socket、Socket长连接和连接池这些概念。它们之间的关系、区别及底层原理,构成了网络编程的基础。本文将从网络协议模型开始,逐步解析这些核心概念,并通过Node.js代码示例演示Socket长连接与连接池的实践。

七层网络模型

理解网络通信,首先要了解经典的分层模型:OSI(开放式系统互联)七层模型。该模型将通信过程分为七层,自下而上依次是:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。每一层都有其特定的职责和协议。

OSI七层模型与对应协议示意图

从上图可以清晰地看到,IP协议工作在网络层,TCP和UDP协议位于传输层,而常见的HTTP协议则属于应用层。那么,常说的Socket又对应哪一层呢?我们稍后结合代码详细说明。

TCP和UDP连接

传输层的TCP和UDP是我们最常打交道的协议。普遍的认识是TCP可靠而UDP不可靠,且UDP传输速度更快。这背后的原因是什么?让我们从TCP建立连接的过程——三次握手开始分析。

TCP的三次握手和四次挥手

TCP以其可靠性著称,这源于其建立和断开连接的严谨过程。

TCP三次握手与四次挥手时序图

三次握手建立连接:

  1. 第一次握手:客户端发送连接请求报文。将标志位SYN置为1,并生成一个随机序列号(Sequence Number)x。随后,客户端进入SYN_SENT状态,等待服务器确认。
  2. 第二次握手:服务器收到SYN报文后,需要确认。它设置确认号(Acknowledgment Number)为x+1。同时,服务器自己也发送一个SYN报文,SYN置为1,序列号为y。服务器将确认和请求信息合并为SYN+ACK报文发送给客户端,并进入SYN_RCVD状态。
  3. 第三次握手:客户端收到服务器的SYN+ACK报文。它将确认号设置为y+1,并向服务器发送ACK报文。此报文发送完毕后,客户端和服务器都进入ESTABLISHED状态,至此连接建立成功,可以开始传输数据。

四次挥手断开连接:

  1. 第一次挥手:主动关闭方(客户端或服务器)发送FIN报文,请求关闭连接,并进入FIN_WAIT_1状态。
  2. 第二次挥手:被动关闭方收到FIN报文后,发送ACK报文进行确认,确认号为收到的序列号加1。主动关闭方收到ACK后进入FIN_WAIT_2状态。
  3. 第三次挥手:被动关闭方处理完数据后,也发送一个FIN报文,请求关闭连接,进入LAST_ACK状态。
  4. 第四次挥手:主动关闭方收到FIN报文后,发送ACK报文确认。随后主动关闭方进入TIME_WAIT状态,等待一段时间(2MSL)后彻底关闭。被动关闭方收到ACK后立即关闭连接。

可以看到,一次完整的TCP连接建立与关闭至少需要7次报文交换,这还不包括实际的数据传输。而UDP则不需要这些握手和挥手过程。

TCP和UDP的区别

基于上述过程,两者的核心区别在于:

  1. 连接 vs 无连接:TCP是面向连接的协议,三次握手在最大程度上保障了连接的可靠性。UDP则是无连接的,发送数据前无需建立连接,也不提供确认机制,因此它是不可靠的传输协议。
  2. 效率与实时性:正是由于UDP的简单性,其开销更小,数据传输速率更高,无需等待确认,实时性更好。例如,早期采用TCP的MSN传输文件比采用UDP的QQ慢,但程序员可以在应用层为UDP增加校验逻辑,在保证一定可靠性的同时获得比TCP更高的效率。

常见问题与分析

在实际应用中,关于传输层常有一些疑问。

1. TCP服务器最大并发连接数是多少?
一种常见的误解是“端口号上限为65535,所以最大并发连接数也是65535”。一条TCP连接由四元组唯一标识:客户端IP、客户端端口、服务端IP、服务端端口。因此,对于服务器的一个端口,其能接受的连接数理论上是所有客户端IP数乘以客户端端口数,远超65535。实际限制往往来自操作系统可打开的文件描述符数量。在Linux中,可以通过ulimit -n查看和修改。

也可以通过修改系统配置文件 /etc/security/limits.conf 来调整:
limits.conf配置文件示例

2. 为什么TIME_WAIT状态需要等待2MSL?
在四次挥手中,主动关闭方发送最后一个ACK后进入TIME_WAIT状态。等待2MSL(报文最大生存时间)是为了防止这个ACK报文丢失。如果ACK丢失,被动关闭方会因超时而重发FIN报文。TIME_WAIT状态的存在使得主动关闭方可以重发这个丢失的ACK,确保连接能正常关闭。

3. TIME_WAIT状态过多会引发什么问题?如何解决?
TIME_WAIT状态下的连接会占用一个本地端口。在高并发短连接场景下(如压力测试),客户端可能会快速产生大量TIME_WAIT连接,耗尽可用端口,导致新的连接抛出“Address already in use”错误。

可以通过调整Linux内核网络参数来缓解。编辑 /etc/sysctl.conf 文件:
编辑sysctl.conf文件命令

添加或修改以下参数:
TCP内核参数配置示例

执行 sudo sysctl -p 使配置生效。

  • net.ipv4.tcp_tw_reuse = 1:允许将处于TIME-WAIT的socket重新用于新的TCP连接。
  • net.ipv4.tcp_tw_recycle = 1:启用TIME-WAIT sockets的快速回收。(注意:此参数在NAT环境下可能导致问题,新版内核已废弃)
  • net.ipv4.tcp_fin_timeout:调整系统默认的FIN-WAIT-2状态超时时间。

Socket与长连接

那么,Socket到底是什么?Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口,将复杂的协议操作封装成简单的函数调用,让程序员可以更方便地进行网络编程。在OSI模型中,Socket可以看作是传输层与应用层之间的一个接口。

长连接指在一个TCP连接上可以连续发送多个数据包。在连接保持期间,如果没有数据发送,通信双方需要通过发送“心跳包”来维持此连接。
短连接则是每次数据传输都新建一个TCP连接,传输完毕立即断开。例如,HTTP/1.0就是典型的短连接模型。

何时使用长连接或短连接?
长连接适用于点对点、操作频繁且连接数不宜过多的场景,如数据库连接。它可以避免频繁建立/断开TCP连接带来的延迟和资源消耗。短连接则适用于像传统HTTP请求这样突发、零散的通信。

为什么需要心跳包?
心跳包是客户端和服务端之间定时发送的自定义通知报文,用于告知对方自己依然“在线”。网络情况复杂,一方断网后,另一方可能无法立刻感知Socket已失效。心跳机制可以定期检测连接的健康状况,确保连接的有效性。TCP自身也提供了可选的Keep-Alive机制,但时间间隔较长;自定义心跳包更为灵活。

实践:Node.js实现Socket心跳机制

下面我们用Node.jsnet模块实现一个带心跳检测的TCP服务器和客户端。

服务端代码:

const net = require('net');
let clientList = [];
const heartbeat = 'HEARTBEAT'; // 定义心跳包内容确保和平时发送的数据不会冲突
const server = net.createServer();
server.on('connection', (client) => {
    console.log('客户端建立连接', client.remoteAddress + ':' + client.remotePort);
    clientList.push(client);
    client.on('data', (chunk) => {
        let content = chunk.toString();
        if (content === heartbeat) {
            console.log('收到客户发过来的一个心跳包');
        } else {
            console.log('收到客户发过来的数据:', content);
            client.write('服务器的数据:' + content);
        }
    });
    client.on('end', () => {
        console.log('收到客户端end');
        clientList.splice(clientList.indexOf(client), 1);
    });
    client.on('error', () => {
        clientList.splice(clientList.indexOf(client), 1);
    });
});
server.listen(9000);
setInterval(broadcast, 10000); // 定时发送心跳包
function broadcast() {
    console.log('broadcast heartbeat, clientList.length: ' + clientList.length);
    let cleanup = [];
    for (let i = 0; i < clientList.length; i++) {
        if (clientList[i].writable) {
            clientList[i].write(heartbeat);
        } else {
            console.log('一个无效的客户端');
            clientList[i].destroy();
            cleanup.push(i);
        }
    }
    for (let i = 0; i < cleanup.length; i++) {
        console.log('删除无效的客户端:', cleanup[i], clientList[cleanup[i]].name);
        clientList.splice(clientList.indexOf(cleanup[i]), 1);
    }
}
// Remove dead Nodes out of write loop to avoid trashing loop index

心跳机制服务端代码截图

服务端运行日志:
服务端运行日志输出

客户端代码:

const net = require('net');
const heartbeat = 'HEARTBEAT';
const client = new net.Socket();
client.connect(9000, '127.0.0.1', () => {});
client.on('data', (chunk) => {
    let content = chunk.toString();
    if (content === heartbeat) {
        console.log('收到心跳包: ', content);
    } else {
        console.log('收到数据: ', content);
    }
});
// 定时发送数据
setInterval(() => {
    console.log('发送数据', new Date().toUTCString());
    client.write(new Date().toUTCString());
}, 5000);
// 定时发送心跳包
setInterval(function () {
    client.write(heartbeat);
}, 10000);

心跳机制客户端代码截图

客户端运行日志:
客户端运行日志输出

自定义应用层协议

仅仅传输原始字节数据是没有意义的,必须定义应用层协议(如HTTP、MQTT)来赋予数据含义。在TCP上自定义协议,需要解决几个问题:

  1. 心跳包格式的定义与处理。
  2. 报文头的定义,用于指明后续数据的长度。
  3. 数据包的序列化格式,如JSON、Protobuf等。

我们定义一个简单的协议:

  • 报文头格式length:000000000xxxx,其中xxxx为数据长度,总长20字节。
  • 序列化方式:JSON。

服务端代码:

const net = require('net');
const server = net.createServer();
let clientList = [];
const HeartBeat = 'HeartBeat';
// 定义心跳包内容确保平时发送的数据不会丢失
const getHeader = (num) => {
    return length => (Array(13).join(0) + num).slice(-13);
};
server.on('connection', client => {
    client.name = client.remoteAddress + ':' + client.remotePort;
    console.log('客户建立连接:', client.name);
    clientList.push(client);
    let chunks = [];
    let length = 0;
    client.on('data', chunk => {
        let content = chunk.toString();
        if (content === heartBeat) {
            console.log('收到客户端发过来的一个心跳包');
        } else {
            if (content.indexOf('length:') >= 0) {
                length = parseInt(content.substring(7,20));
                console.log('length:', length);
                chunks.push(chunk);
            } else {
                let heap = Buffer.concat(chunks);
                if (heap.length >= length) {
                    try {
                        console.log('收到数据:', JSON.parse(heap.toString()));
                        let data = JSON.parse(heap.toString());
                        let dataBuff = Buffer.from(JSON.stringify(data));
                        client.write(header);
                        client.write(dataBuff);
                    } catch (err) {
                        console.log('数据解析失败');
                    }
                }
            }
        }
    });
    client.on('end', () => {
        console.log('收到客户断开');
        clientList.splice(clientList.indexOf(client), 1);
    });
    client.on('error', () => {
        console.log('客户断开');
        clientList.splice(clientList.indexOf(client), 1);
    });
});
server.listen(9000);
setInterval(broadcast, 10000); // 定时向客户广播 并发送心跳包
function broadcast() {
    console.log('broadcast heartbeat', clientList.length);
    let cleanup = [];
    for(var i=0; i<clientList.length; i++) {
        if(clientList[i].writable) { // 确认 sockets 是否可写
            console.log('一个无效的客户心跳');
            // 发送心跳报文
            // 清理无效连接,销毁之前用 Socket.destroy() 用 API 的方法销毁。
            cleanup.push(clientList[i]);
            clientList[i].destroy();
        }
    }
    for(let i=0; i<cleanup.length; i++) {
        console.log('删除无效的客户:', cleanup[i].name);
        clientList.splice(clientList.indexOf(cleanup[i]), 1);
    }
}

自定义协议服务端代码截图

服务端日志:
自定义协议服务端运行日志

客户端代码:

const net = require('net');
const client = new net.Socket();
const heartBeat = 'HeartBeat'; // 定义心跳包内容确保和平时发送的数据不会冲突
const getHeader = (num) => {
    return 'length:' + (Array(13).join(0) + num).slice(-13);
};
client.connect(9000, '127.0.0.1', function () {});
let chunks = [];
client.on('data', (chunk) => {
    let content = chunk.toString();
    console.log("content:", content, content.length);
    if (content === heartBeat) {
        console.log('收到服务端发过来的一个心跳包');
    } else {
        if (content.indexOf('length:') === 0) {
            length = parseInt(content.substring(7,20));
            console.log('length', length);
            chunks = [chunk.slice(20, chunk.length)];
        } else {
            chunks.push(chunk);
        }
    }
    let heap = Buffer.concat(chunks);
    console.log('heap.length', heap.length);
    if (heap.length >= length) {
        try {
            console.log('收到数据', JSON.parse(heap.toString()));
        } catch (err) {
            console.log('数据解析失败');
        }
    }
});
// 定时发送数据
setInterval(function () {
    let data = new Date().toUTCString();
    let dataBuff = Buffer.from(JSON.stringify(data));
    let header = getHeader(dataBuff.length);
    client.write(header);
    client.write(dataBuff);
}, 5000);
// 定时发送心跳包
setInterval(function () {
    client.write(heartBeat);
}, 10000);

自定义协议客户端代码截图

客户端日志:
自定义协议客户端运行日志

以上示例展示了单个客户端的长连接通信。但在高并发场景下,如果多个请求复用同一个Socket连接,数据可能发生粘包/拆包,难以区分响应与请求的对应关系。此时,就需要引入连接池来管理多个长连接。

Socket连接池实战

连接池维护着一组可复用的Socket长连接。它能自动检测连接的有效性,剔除失效连接,并动态管理连接数量。一个典型的连接池包含以下组件:

  1. 空闲连接队列
  2. 活跃连接队列
  3. 等待获取连接的请求队列
  4. 失效连接剔除机制
  5. 连接数配置(最小、最大连接数)
  6. 新建连接的功能

工作流程:请求到达时,首先尝试从空闲队列获取连接;若无空闲连接且活跃连接数未达上限,则创建新连接;若已达上限,则请求进入等待队列。当活跃连接完成请求后,它被移回空闲队列,并尝试满足等待队列中的请求。

下面使用Node.js的通用连接池模块 generic-pool 来实现。

主要目录结构:
generic-pool模块目录结构

初始化连接池:

'use strict'
const net = require('net');
const genericPool = require('generic-pool');
function createPool(config) {
    let options = Object.assign({
        fifo: true,
        // 是否优先使用老的资源
        priorityRange: 1,
        // 优先级
        testOnBorrow: true,
        // 借出后立即校验
        autostart: true,
        // 自动启动连接池
        min: 10,
        // 最小连接数
        max: 10,
        // 最大连接数
        evictionRunIntervalMillis: 0,
        // 资源释放检查间隔时间
        numTestsPerEvictionRun: 3,
        // 每次释放检查次数
        softIdleTimeoutMillis: 1,
        // 空闲超时时间(毫秒)
        idleTimeoutMillis: 30000,
        // 可用的超过了最小的min 且空闲时间时间 达到释放
        maxWaitingClients: 59,
        // 最大等待客户数
        // 配置选项
    }, config.options);
    const factory = function () {
        return new Promise((resolve, reject) => {
            let socket = new net.Socket();
            socket.setKeepAlive(true);
            socket.connect(config.port, config.host);
            socket.on('connect', () => {
                console.log('socket_pool', config.host, config.port, 'connect');
                resolve(socket);
            });
            socket.on('close', (err) => { // 关闭事件 close事件
                console.log('socket_pool', config.host, config.port, 'close', err);
                socket.destroy();
                console.log('error', (err) => {
                    console.log('socket_pool', config.host, config.port, 'error', err, err);
                    reject(err);
                });
            });
        });
    };
    //销毁连接
    destroy = function (socket) {
        return new Promise((resolve) => {
            // 第一次会触发发close事件 如果有message会触发error事件
            socket.destroy();
            resolve();
        });
    };
    validate = function (socket) { // 评估资源池检查是否有效性
        if (socket.destroyed || !socket.readable || !socket.writable) {
            return resolve(false);
        }
        return resolve(true);
    };
};
const pool = genericPool.createPool(factory, options);
// 此时触发创建连接 让池子自动启动连接池
pool.on('factoryCreateError', (err) => {
    const clientResourceRequest = pool._waitingClientsQueue.dequeue();
    if (clientResourceRequest) {
        clientResourceRequest.reject(err);
    }
});
return pool;
}
let pool = createPool({
    port: 9000,
    host: '127.0.0.1',
    options: { min: 0, max: 10 }
});

连接池初始化代码截图

使用连接池进行请求(基于自定义协议):

let pool = createPool({ port: 9000, host: '127.0.0.1', options: {min: 0, max: 10} });
const getHeader = (num) => {
    return 'length:' + (Array(13).join(0) + num).slice(-13);
}
const request = async (requestDataBuff) => {
    let client;
    try {
        client = await pool.acquire();
    } catch (e) {
        console.log('acquire socket client failed: ', e);
        throw e;
    }
    let timeout = 10000;
    return new Promise((resolve, reject) => {
        let chunks = [];
        let length = 0;
        client.setTimeout(timeout);
        client.removeAllListeners('error');
        client.on('error', (err) => {
            client.removeAllListeners('error');
            client.removeAllListeners('data');
            client.removeAllListeners('timeout');
            pool.destroyed(client);
            reject(err);
        });
        client.on('timeout', () => {
            client.removeAllListeners('error');
            client.removeAllListeners('data');
            client.removeAllListeners('timeout');
            // 应该销毁以防止下一个req的数据事件监听才返回数据
            pool.destroy(client);
            reject('socket connect timeout set ${timeout}');
        });
        client.write(requestDataBuff);
        client.on('data', (chunk) => {
            let content = chunk.toString();
            console.log('content', content, content.length);
            // TODO 过滤心包
            if (content.indexOf('length:') === 0) {
                length = parseInt(content.substring(7,20));
                console.log('length', length);
                chunks=[chunk.slice(20, chunk.length)];
            } else {
                chunks.push(chunk);
            }
            let heap = Buffer.concat(chunks);
            console.log('heap.length', heap.length);
            if (heap.length >= length) {
                pool.release(client);
                client.removeAllListeners('error');
                client.removeAllListeners('data');
                client.removeAllListeners('timeout');
                try {
                    // console.log('收到数据', JSON.parse(heap.toString()));
                    resolve(JSON.parse(heap.toString()));
                } catch (err) {
                    reject(err);
                    console.log('数据解析失败');
                }
            }
        });
    });
}
request(Buffer.from(JSON.stringify({a: 'a'})))
    .then((data) => {
        console.log('收到服务的数据', data)
    }).catch(err => {
        console.log(err);
    })
request(Buffer.from(JSON.stringify({b: 'b'})))
    .then((data) => {
        console.log('收到服务的数据', data)
    }).catch(err => {
        console.log(err);
    })
setTimeout(function() {
    //查看是否含复用Socket 有没有建立新的连接
    request(Buffer.from(JSON.stringify({c: 'c'})))
        .then((data) => {
            console.log('收到服务的数据', data)
        }).catch(err => {
            console.log(err);
        })
}, 1000)
request(Buffer.from(JSON.stringify({d: 'd'})))
    .then((data) => {
        console.log('收到服务的数据', data)
    }).catch(err => {
        console.log(err);
    })

使用连接池的客户端代码截图

运行日志:
连接池运行日志输出

从日志可以看到,前两个请求建立了新的Socket连接(socket_pool 127.0.0.1 9000 connect),而后续的请求(如{c: 'c'})则复用了连接池中已有的连接,没有触发新的连接创建。

连接池源码浅析

generic-pool的核心逻辑位于lib/Pool.js。其构造函数初始化了各种队列(空闲资源、等待客户端等):
Pool.js构造函数代码截图

acquire方法是获取资源的核心,它处理了资源请求、排队、创建新资源或分发现有资源的完整逻辑:
Pool.acquire方法部分代码截图

内部的_dispatch_dispatchResource方法负责资源调度和分发,确保请求能高效地获取到可用的连接:
资源调度_dispatch方法代码截图
资源分发_dispatchResource方法代码截图

总结

我们从OSI七层模型切入,分析了TCP/UDP的核心区别与握手过程,探讨了TIME_WAIT等实战问题。然后深入Socket编程,通过Node.js示例实现了心跳机制和自定义应用层协议。最后,面对高并发场景下的连接管理需求,引入了Socket连接池的概念,并利用generic-pool库演示了其强大功能。理解这些层层递进的知识,对于构建高性能、高可靠的网络应用至关重要。

希望这篇从原理到实战的解析,能帮助你彻底理清TCP、HTTP、Socket及连接池之间的关系。如果你对网络编程或系统架构有更多兴趣,欢迎在云栈社区与其他开发者交流探讨。




上一篇:OpenClaw AI智能体失控:自主“网暴”开源维护者,首例现实案例警示AI安全风险
下一篇:Xbox人事变动:传奇掌门菲尔·斯宾塞退休,微软游戏业务战略调整分析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 09:03 , Processed in 0.602884 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表