找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1780

积分

0

好友

238

主题
发表于 昨天 17:44 | 查看: 2| 回复: 0

作为一名开发人员,我们经常听到HTTP协议、TCP/IP协议、UDP协议、Socket、Socket长连接、Socket连接池等术语。然而,它们之间的关系、区别及内在原理,你是否能完全理解清楚?这篇文章将从网络协议基础开始,逐步剖析至Socket连接池的实现,帮你理清它们之间的关联。

七层网络模型

首先,让我们从网络通信的分层模型讲起:七层模型,也称为OSI(Open System Interconnection)模型。自下而上分为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。所有的通信都建立在这个模型之上。

计算机网络OSI七层模型与协议对应图

从上图可知,IP协议对应于网络层,TCP、UDP协议对应于传输层,而HTTP协议对应于应用层。那么,Socket位于哪一层呢?我们将在后续结合代码详细介绍。

TCP和UDP连接

关于传输层的TCP和UDP协议,我们平时接触较多。普遍认为TCP是可靠的、面向连接的,而UDP是不可靠的、无连接的,且UDP传输速度通常比TCP快。这是为什么呢?让我们从TCP建立连接的过程开始分析,进而解释两者的区别。

TCP的三次握手和四次挥手

我们知道,TCP建立连接需要经过三次握手,而断开连接则需要经过四次挥手。那么,这“握手”与“挥手”具体做了什么呢?

TCP三次握手与四次挥手过程时序图

  • 第一次握手:客户端发送连接请求报文段,将SYN标志位设为1,并随机生成一个序列号(Sequence Number)x。随后,客户端进入SYN_SEND状态,等待服务器的确认。
  • 第二次握手:服务器收到客户端的SYN报文段后,需要对此进行确认。它将Acknowledgment Number设为x+1。同时,服务器自己也发送一个SYN报文段,将SYN标志位设为1,序列号设为y。服务器将SYN和ACK信息合并到一个报文段中发送给客户端,此时服务器进入SYN_RECV状态。
  • 第三次握手:客户端收到服务器的SYN+ACK报文段后,将Acknowledgment Number设置为y+1,并向服务器发送一个ACK报文段。此报文段发送完毕后,客户端和服务器都进入ESTABLISHED状态,完成TCP三次握手。

至此,双方可以开始传输数据。通信结束后,需要经过四次挥手来断开连接。

  • 第一次挥手:主动关闭方(客户端或服务器)发送一个FIN报文段,并进入FIN_WAIT_1状态,表示没有数据要发送了。
  • 第二次挥手:被动关闭方收到FIN报文段后,发送一个ACK报文段进行确认,Acknowledgment Number为收到的序列号加1。主动关闭方收到后进入FIN_WAIT_2状态。
  • 第三次挥手:被动关闭方处理完所有数据后,也发送一个FIN报文段,请求关闭连接,并进入LAST_ACK状态。
  • 第四次挥手:主动关闭方收到FIN报文段后,发送一个ACK报文段确认,然后进入TIME_WAIT状态。被动关闭方收到ACK后,立即关闭连接。主动关闭方等待2MSL(最长报文段寿命)时间后,若没有收到重传请求,则也关闭连接。

可以看到,一次TCP连接的建立和关闭至少需要进行7次报文交换,这还不包括实际的数据传输。而UDP则不需要这些握手和挥手过程。

TCP和UDP的区别

  1. 连接性:TCP是面向连接的协议,通过三次握手在最大程度上保证了连接的可靠性。UDP是无连接的,发送数据前无需建立连接,也不对接收到的数据发送确认,因此不可靠,但开销小。
  2. 效率与实时性:由于没有连接建立、维护和确认的开销,UDP的数据传输速率更高,延迟更低,实时性更好。例如,采用UDP的QQ在传输即时消息时比采用TCP的MSN更快。但应用程序可以自己在UDP之上实现可靠性机制(如数据包编号、校验等)。

常见问题解析

关于传输层,我们常会遇到以下问题:

1. TCP服务器最大并发连接数是多少?

有一种误解认为:“因为端口号上限为65535,所以TCP服务器理论上的最大并发连接数也是65535”。这里需要理解一条TCP连接的组成要素:客户端IP、客户端端口、服务端IP、服务端端口。因此,对于TCP服务端进程而言,它能同时处理的客户端连接数并不受限于其自身的端口号。理论上,一个服务器端口可以建立全球IP数量乘以每个客户端端口数量的连接。

实际上,并发连接数受限于操作系统可打开的文件描述符数量。这个限制是可以配置的,可以设置得非常大,所以真正的瓶颈在于系统性能(如内存、CPU)。你可以通过 ulimit -n 命令查看当前进程的文件描述符限制。

也可以通过修改系统配置文件来调整限制:

#vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

修改系统文件描述符限制配置文件

2. 为什么TIME_WAIT状态需要等待2MSL后才能关闭?

这是因为网络是不可靠的。主动关闭方最后发送的ACK报文有可能丢失。如果丢失,处于LAST_ACK状态的被动关闭方会因为超时未收到ACK而重发FIN报文。TIME_WAIT状态的作用就是用来重发可能丢失的ACK报文,确保连接能正确、彻底地关闭。

3. TIME_WAIT状态过多会引发什么问题?

主动关闭连接的一方会进入TIME_WAIT状态,通常持续1-4分钟(Windows为4分钟)。一个TIME_WAIT连接会占用一个本地端口。如果客户端进行压力测试,模拟大量短连接请求,可能会快速产生成千上万个TIME_WAIT连接,导致后续新连接因端口耗尽而失败,报错“address already in use : connect”。

对于服务器,如果作为反向代理(如Nginx)也存在大量短连接,也可能遇到此问题。可以通过调整Linux内核参数来缓解:

vi /etc/sysctl.conf

编辑Linux内核参数配置文件

添加或修改以下内容:

net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30

Linux内核TCP连接优化参数示例

然后执行 /sbin/sysctl -p 让参数生效。

  • net.ipv4.tcp_syncookies = 1:开启SYN Cookies,用于防范少量SYN洪水攻击。
  • net.ipv4.tcp_tw_reuse = 1:允许将TIME-WAIT sockets重新用于新的TCP连接。
  • net.ipv4.tcp_tw_recycle = 1:开启TIME-WAIT sockets的快速回收。
  • net.ipv4.tcp_fin_timeout:修改系统默认的FIN_WAIT_2状态超时时间。

Socket长连接

所谓长连接,是指在一个TCP连接上可以连续发送多个数据包。在连接保持期间,如果没有数据包发送,通信双方需要通过发送检测包(心跳包)来维持此连接。

短连接则是指:有数据交互时建立一个TCP连接,数据发送完成后立即断开。例如,HTTP/1.0通常就是短连接。

长连接操作流程:连接 → 数据传输 → 保持连接(心跳)→ 数据传输 → 保持连接(心跳)→ … → 关闭连接。

何时使用长连接或短连接?

长连接适用于操作频繁、点对点通信,且总连接数不宜过多(因为每个连接都会占用系统资源)的场景。例如数据库连接池。如果每个操作都使用短连接,频繁的三次握手会显著降低处理速度,且频繁创建/销毁Socket也是资源浪费。

什么是心跳包?为什么需要它?

心跳包是客户端和服务端之间定时发送的、用于通知对方自身状态的自定义报文。它的作用是判断一个看似正常的Socket连接是否实际上已经失效(例如一方网络异常断开)。TCP本身提供了Keep-Alive机制,但间隔时间较长。应用程序通常需要自定义心跳机制,以更快的频率(如每秒一次)确认对方是否“在线”,确保连接的有效性。

心跳包实现示例

服务端代码 (Node.js)

const net = require('net');
let clientList = [];
const heartbeat = 'HEARTBEAT'; // 定义心跳包内容确保和平时发送的数据不会冲突
const server = net.createServer();
server.on('connection', (client) => {
    console.log('客户端建立连接', client.remoteAddress + ':' + client.remotePort);
    clientList.push(client);
    client.on('data', (chunk) => {
        let content = chunk.toString();
        if (content === heartbeat) {
            console.log('收到客户发过来的一个心跳包');
        } else {
            console.log('收到客户发过来的数据:', content);
            client.write('服务器的数据:' + content);
        }
    });
    client.on('end', () => {
        console.log('收到客户端end');
        clientList.splice(clientList.indexOf(client), 1);
    });
    client.on('error', () => {
        clientList.splice(clientList.indexOf(client), 1);
    });
});
server.listen(9000);
setInterval(broadcast, 10000); // 定时发送心跳包
function broadcast() {
    console.log('broadcast heartbeat, clientList.length: ' + clientList.length);
    let cleanup = [];
    for (let i = 0; i < clientList.length; i++) {
        if (clientList[i].writable) {
            clientList[i].write(heartbeat);
        } else {
            console.log('一个无效的客户端');
            cleanup.push(clientList[i]);
        }
    }
    for (let i = 0; i < cleanup.length; i++) {
        console.log('删除无效的客户端:', cleanup[i].name);
        clientList.splice(clientList.indexOf(cleanup[i]), 1);
    }
}

Node.js实现的TCP服务器心跳包示例代码

服务端输出
TCP服务器心跳包运行日志输出

客户端代码

const net = require('net');
const heartbeat = 'HEARTBEAT';
const client = new net.Socket();
client.connect(9000, '127.0.0.1', () => {});
client.on('data', (chunk) => {
    let content = chunk.toString();
    if (content === heartbeat) {
        console.log('收到心跳包:', content);
    } else {
        console.log('收到数据:', content);
    }
});
// 定时发送数据
setInterval(() => {
    console.log('发送数据', new Date().toUTCString());
    client.write(new Date().toUTCString());
}, 5000);
// 定时发送心跳包
setInterval(function () {
    client.write(heartbeat);
}, 10000);

Node.js实现的TCP客户端心跳包示例代码

客户端输出
TCP客户端心跳包运行日志输出

定义自定义应用层协议

如果想要在TCP之上传输有业务意义的数据,就需要定义自己的应用层协议,就像HTTP、MQTT、Dubbo那样。自定义协议通常需要解决几个问题:

  1. 心跳包格式的定义及处理(上文已实现)。
  2. 报文头的定义:用于指明后续数据的长度,以便接收方能正确解析。
  3. 数据包的格式:例如使用JSON、Protocol Buffers等进行序列化。

下面我们定义一个简单的协议:

  • 报文头格式length:000000000xxxxxxxx代表后续JSON数据的字节长度,总长度固定为20字节(示例简化,不严谨)。
  • 数据序列化方式:JSON。

服务端代码

const net = require('net');
const server = net.createServer();
let clientList = [];
const HeartBeat = 'HeartBeat'; // 定义心跳包内容确保平时发送的数据不会丢失
const getHeader = (num) => {
    return ‘length:‘ + (Array(13).join(0) + num).slice(-13);
};
server.on('connection', client => {
    client.name = client.remoteAddress + ':' + client.remotePort;
    console.log('客户建立连接', client.name);
    clientList.push(client);
    let chunks = [];
    let length = 0;
    client.on('data', chunk => {
        let content = chunk.toString();
        if (content === heartBeat) {
            console.log('收到客户端发过来的一个心跳包');
        } else {
            if (content.indexOf('length:') >= 0) {
                length = parseInt(content.substring(7,20));
                console.log('length', length);
                chunks.push(chunk);
            }
            let heap = Buffer.concat(chunks);
            if (heap.length >= length) {
                try {
                    console.log('收到数据:', JSON.parse(heap.toString()));
                    let data = JSON.parse(heap.toString());
                    let dataBuff = Buffer.from(JSON.stringify(data));
                    client.write(header);
                    client.write(dataBuff);
                } catch (err) {
                    console.log('数据解析失败');
                }
            }
        }
    });
    client.on('end', () => {
        console.log('收到客户断开');
        clientList.splice(clientList.indexOf(client), 1);
    });
    client.on('error', () => {
        clientList.splice(clientList.indexOf(client), 1);
    });
});
server.listen(9080);
setInterval(broadcast, 10000); // 定时给客户广播 并发送心跳包
function broadcast() {
    console.log('broadcast heartbeat', clientList.length);
    let cleanup = [];
    for(var i=0; i<clientList.length; i++) {
        if(clientList[i].writable) { // 确认 sockets 是否可写
            console.log('一个无效的客户心跳包');
            // 发送心跳报文
            // 清理无效连接,销毁之前用 Socket.destroy() 用 API 的方法销毁。
            cleanup.push(clientList[i]);
            clientList.splice(clientList.indexOf(cleanup[i]), 1);
        }
    }
    for(let i=0; i<cleanup.length; i++) {
        console.log('删除无效的客户:', cleanup[i].name);
        clientList.splice(clientList.indexOf(cleanup[i]), 1);
    }
}

支持自定义协议(含报文头)的Node.js TCP服务器代码

服务端日志
自定义协议服务器运行日志

客户端代码

const net = require('net');
const client = new net.Socket();
const heartBeat = 'HeartBeat'; // 定义心跳包内容确保和平时发送的数据不会冲突
const getHeader = (num) => {
    return ‘length:‘ + (Array(13).join(0) + num).slice(-13);
};
client.connect(9000, '127.0.0.1', function () {});
let chunks = [];
client.on('data', (chunk) => {
    let content = chunk.toString();
    console.log(‘content:‘, content, content.length);
    if (content === heartBeat) {
        console.log('收到服务端发过来的一个心跳包');
    } else {
        if (content.indexOf('length:') === 0) {
            length = parseInt(content.substring(7,20));
            console.log('length', length);
            chunks = [chunk.slice(20, chunk.length)];
        } else {
            chunks.push(chunk);
        }
    }
    let heap = Buffer.concat(chunks);
    console.log('heap.length', heap.length);
    if (heap.length >= length) {
        try {
            console.log('收到数据', JSON.parse(heap.toString()));
        } catch (err) {
            console.log('数据解析失败');
        }
    }
});
// 定时发送数据
setInterval(function () {
    let data = new Date().toUTCString();
    let dataBuff = Buffer.from(JSON.stringify(data));
    let header = getHeader(dataBuff.length);
    client.write(header);
    client.write(dataBuff);
}, 5000);
// 定时发送心跳包
setInterval(function () {
    client.write(heartBeat);
}, 10000);

支持自定义协议(含报文头)的Node.js TCP客户端代码

客户端日志
自定义协议客户端运行日志

上面的例子展示了单个客户端在长连接中顺序处理请求的情况。但如果同一个客户端在极短时间内并发发送多个请求,数据包可能会在服务端交错到达(粘包),导致难以区分哪个数据体对应哪个请求头。为了解决高并发场景下长连接的复用问题,就需要引入连接池机制。

Socket连接池

什么是Socket连接池?“池”的概念意味着资源的集合。Socket连接池,就是维护着一定数量、健康可用的Socket长连接的集合。它能自动检测连接的有效性,剔除无效连接,并维持池内连接的数量。

一个连接池通常包含以下核心功能属性:

  1. 空闲可用的长连接队列
  2. 正在使用的长连接队列
  3. 等待获取空闲连接的请求队列
  4. 无效长连接的剔除功能
  5. 连接池容量配置(最小、最大连接数)
  6. 新建长连接的功能

工作流程:当请求到达时,首先尝试从空闲队列获取一个长连接。如果获取成功,则将该连接移至“正在使用”队列。如果空闲队列为空,且“正在使用”的连接数小于最大池大小,则新建一个连接。如果连接池已满,则该请求进入等待队列。当一个正在使用的连接完成请求后,它被移回空闲队列,并触发检查是否有等待的请求可以获取这个刚刚释放的资源。

下面以Node.js的一个通用连接池模块 generic-pool 为例进行说明。

主要文件目录结构
generic-pool连接池模块项目目录结构

初始化连接池

‘use strict‘
const net = require(‘net‘);
const genericPool = require(‘generic-pool‘);
function createPool(config) {
  let options = Object.assign({
    fifo: true, // 是否优先使用老的资源
    priorityRange: 1, // 优先级
    testOnBorrow: true, // 建立开启取验证
    autostart: true, // 自动初始化连接池并启用
    min: 10, // 初始化连接池时预设的最小数量
    max: 10, // 最大连接池保持的连接数
    evictionRunIntervalMillis: 0, // 资源释放检查间隔(单位毫秒)
    numTestsPerEvictionRun: 3, // 每次释放前检查次数
    softIdleTimeoutMillis: 10000, // 可用的超过了最小的min且空闲时间时间 达到释放
    idleTimeoutMillis: 30000, // 最大等待时间
    maxWaitingClients: 59, // 最大等待
    // config.options
  }, config.options);
  const factory = function () {
    return new Promise((resolve, reject) => {
      let socket = new net.Socket();
      socket.setKeepAlive(true);
      socket.connect(config.port, config.host);
      socket.on(‘connect‘, () => {
        console.log(‘socket_pool‘, config.host, config.port, ‘connect‘);
        resolve(socket);
      });
      socket.on(‘close‘, (err) => { // 关闭事件 close事件
        console.log(‘socket_pool‘, config.host, config.port, ‘close‘, err);
        socket.destroy();
        console.log(‘error‘, (err) => {
          console.log(‘socket_pool‘, config.host, config.port, ‘error‘, err, err);
          reject(err);
        });
      });
    });
  };
  // 销毁连接
  destroy = function (socket) {
    return new Promise((resolve) => {
      // 第一次会触发发close事件 如果有message会触发error事件
      socket.destroy();
      resolve();
    });
  };
  validate = function (socket) { // 伊朗原装油枪检查有效性
    return new Promise((resolve) => {
      if (!socket.destroyed || !socket.readable || !socket.writable) {
        return resolve(false);
      }
      return resolve(true);
    });
  };
  const pool = genericPool.createPool(factory, options);
  pool.on(‘factoryCreateError‘, (err) => {
    const clientResourceRequest = pool._waitingClientsQueue.dequeue();
    if (clientResourceRequest) {
      clientResourceRequest.reject(err);
    }
  });
  return pool;
};
let pool = createPool({
  port: 9000,
  host: ‘127.0.0.1‘,
  options: { min: 0, max: 10 }
});

使用generic-pool创建Socket连接池的初始化代码

使用连接池发起请求

下面我们使用上面定义的连接池和自定义协议来发起请求。

let pool = createPool({ port: 9000, host: ‘127.0.0.1‘, options: {min: 0, max: 10} });
const getHeader = (num) => {
    return ‘length: ‘ + (Array(13).join(0) + num).slice(-13);
}
const request = async (requestDataBuff) => {
    let client;
    try {
        client = await pool.acquire();
    } catch (e) {
        console.log(‘acquire socket client failed: ‘, e);
        throw e;
    }
    let timeout = 10000;
    return new Promise((resolve, reject) => {
        let chunks = [];
        let length = 0;
        client.setTimeout(timeout);
        client.removeAllListeners(‘error‘);
        client.on(‘error‘, (err) => {
            client.removeAllListeners(‘error‘);
            client.removeAllListeners(‘data‘);
            client.removeAllListeners(‘timeout‘);
            pool.destroyed(client);
            reject(err);
        });
        client.on(‘timeout‘, () => {
            client.removeAllListeners(‘error‘);
            client.removeAllListeners(‘data‘);
            client.removeAllListeners(‘timeout‘);
            // 应该销毁以防下一个req的数据事件监听才返回数据
            pool.destroy(client);
            reject(‘socket connect timeout set ${timeout}‘);
        });
        let header = getHeader(requestDataBuff.length);
        client.write(header);
        client.write(requestDataBuff);
        client.on(‘data‘, (chunk) => {
            let content = chunk.toString();
            console.log(‘content‘, content, content.length);
            // TODO 过滤心包
            if (content.indexOf(‘length:‘) === 0) {
                length = parseInt(content.substring(7,20));
                console.log(‘length‘, length);
                chunks=[chunk.slice(20, chunk.length)];
            } else {
                chunks.push(chunk);
            }
            let heap = Buffer.concat(chunks);
            console.log(‘heap.length‘, heap.length);
            if (heap.length >= length) {
                pool.release(client);
                client.removeAllListeners(‘error‘);
                client.removeAllListeners(‘data‘);
                client.removeAllListeners(‘timeout‘);
                try {
                    // console.log(‘收到数据‘, JSON.parse(heap.toString()));
                    resolve(JSON.parse(heap.toString()));
                } catch (err) {
                    reject(err);
                    console.log(‘数据解析失败‘);
                }
            }
        });
    });
}
request(Buffer.from(JSON.stringify({a: ‘a‘})))
    .then((data) => { console.log(‘收到服务的数据‘, data) }).catch(err => { console.log(err); })
request(Buffer.from(JSON.stringify({b: ‘b‘})))
    .then((data) => { console.log(‘收到服务的数据‘, data) }).catch(err => { console.log(err); })
setTimeout(function () {
    //查看是否复用Socket 有没有建立新的连接
    request(Buffer.from(JSON.stringify({c: ‘c‘})))
        .then((data) => { console.log(‘收到服务的数据‘, data) }).catch(err => { console.log(err); })
}, 1000)
request(Buffer.from(JSON.stringify({d: ‘d‘})))
    .then((data) => { console.log(‘收到服务的数据‘, data) }).catch(err => { console.log(err); })

使用连接池进行Socket请求的示例代码

日志输出

socket_pool 127.0.0.1 9000 connect
socket_pool 127.0.0.1 9000 connect
content length:0000000000040"服务端的数据数据:{“a”:“a”} 44
length 40
heap.length 40
收到服务端的数据 服务端的数据数据:{“a”:“a”}
content length:0000000000040"服务端的数据数据:{“b”:“b”} 44
length 40
heap.length 0
content 服务端的数据数据:{“c”:“c”} 24
heap.length 40
收到服务端的数据 服务端的数据数据:{“c”:“c”}
content length:0000000000040"服务端的数据数据:{“d”:“d”} 44
length 40
heap.length 40
收到服务端的数据 服务端的数据数据:{“d”:“d”}

连接池运行日志,显示连接复用情况

从日志可以看到,前两个请求分别建立了新的Socket连接。而在定时器结束后发起的请求,则复用了连接池中已有的连接,没有创建新连接。

连接池源码浅析

generic-pool的核心逻辑位于 lib/Pool.js 文件中。其构造函数定义了空闲资源队列、正在使用的资源队列、等待请求队列等核心数据结构。

关键的资源获取(acquire)和分发(_dispense, _dispatchResource)逻辑,实现了之前描述的连接池工作流程:检查空闲队列、判断是否需要创建新连接、处理等待队列等。这些代码很好地诠释了资源池管理的通用设计模式

通过以上从协议基础到应用实践的梳理,相信你对TCP、HTTP、Socket及连接池有了更系统、深入的理解。掌握这些底层原理,将有助于你设计出更稳定、高效的高并发网络服务。




上一篇:MicroClaw 飞书多 Bot 配置指南:构建一人公司的虚拟团队
下一篇:为什么我从Java转用.NET后,开发效率提升了?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-3 22:56 , Processed in 0.379394 second(s), 43 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表