在前面的章节中,我们已陆续实现了基础的HTTP/1.1请求、连接复用、请求头/响应头解析、超时控制、TLS支持、DNS缓存以及连接池的雏形。
今天,我们进入关键的阶段实战环节:将这些零散的部件整合成一个能够进行压测、并且在大部分场景下可与 http.DefaultTransport 对标(结构已非常接近)的可运行客户端。
最终目标是:实现一个名为 kofer.RoundTripper 的结构,它可以直接用于 http.Client{Transport: xxx}。
一、回顾 Go 标准库 Transport 的核心骨架
通过阅读 src/net/http/transport.go,我们可以提炼出最核心的几个结构体和流程。
type Transport struct {
// 连接池相关(最重要!)
idleConn map[connectMethodKey]*persistConnQueue // 空闲连接
connsPerHost map[connectMethodKey]int // 正在使用的连接数
maxConnsPerHost int
// Dial 相关
DialContext func(ctx context.Context, network, addr string)(net.Conn, error)
// TLS
TLSClientConfig *tls.Config
TLSHandshakeTimeout time.Duration
// 各种超时
IdleConnTimeout time.Duration
ResponseHeaderTimeout time.Duration
ExpectContinueTimeout time.Duration
// Proxy
Proxy func(*Request)(*url.URL, error)
// ...
}
一次 RoundTrip 的主流程大致如下:
- 根据 scheme/host/port 得到
connectMethodKey
- 尝试从空闲连接池取一个
persistConn
- 如果没有 → 新建连接(Dial → TLS握手 → 如果是代理则发送 CONNECT)
- 得到
persistConn 后,发送请求头与 body
- 读取响应头
- 根据
Connection: keep-alive / HTTP/1.1 默认行为决定是否将连接放回连接池
- 返回 Response
今天我们将尽量贴近这个结构,但会实现一个简化版本。
二、阶段实战目标代码结构
kofer-http/
├── client.go // 使用示例 + http.Client 包装
├── roundtripper.go // 主 RoundTripper 实现
├── persist_conn.go // 持久连接封装
├── conn_pool.go // 简易连接池(map + mutex + idle list)
├── dial.go // 自定义拨号 + DNS缓存(可复用前面的)
└── utils.go // 一些小工具(header parse, read response 等)
下面直接给出核心代码(已大幅精简,注释详细)。
1. persist_conn.go —— 持久连接核心
package koferhttp
import (
"bufio"
"context"
"crypto/tls"
"io"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
type persistConn struct {
mu sync.Mutex
conn net.Conn // 底层 TCP 或 TLS 连接
reader *bufio.Reader // 用于读取响应
writer *bufio.Writer // 用于写请求
sawEOF bool
closed atomic.Bool
createdAt time.Time
// 用于放回连接池的钥匙
key connectMethodKey
// 当连接被从池中取出后,这个 channel 用于通知有人正在使用
onCloseCh chan struct{}
}
func (pc *persistConn) RoundTrip(req *http.Request) (*http.Response, error) {
pc.mu.Lock()
if pc.closed.Load() {
pc.mu.Unlock()
return nil, ErrClosed
}
pc.mu.Unlock()
// 1. 写请求
if err := pc.writeRequest(req); err != nil {
pc.markDead()
return nil, err
}
// 2. 读响应头
resp, err := pc.readResponse(req)
if err != nil {
pc.markDead()
return nil, err
}
// 3. 判断是否可以复用
if !pc.shouldKeepAlive(req, resp) {
pc.markDead()
}
return resp, nil
}
func (pc *persistConn) writeRequest(req *http.Request) error {
// 省略大部分 header 构建逻辑(可参考前几篇)
// 这里只写关键部分
_, err := fmt.Fprintf(pc.writer, "%s %s HTTP/1.1\r\n", req.Method, req.URL.RequestURI())
if err != nil {
return err
}
// Host、User-Agent、Content-Length 等...
// ...
if req.Body != nil {
// chunked 或 content-length 写 body
}
return pc.writer.Flush()
}
func (pc *persistConn) readResponse(req *http.Request) (*http.Response, error) {
resp, err := http.ReadResponse(pc.reader, req)
if err != nil {
return nil, err
}
// 修复常见问题:Transfer-Encoding: chunked 时 Body 是 chunked reader
return resp, nil
}
func (pc *persistConn) markDead() {
pc.mu.Lock()
defer pc.mu.Unlock()
if !pc.closed.CompareAndSwap(false, true) {
return
}
pc.conn.Close()
// 通知外面有人在等这个连接关闭
if pc.onCloseCh != nil {
close(pc.onCloseCh)
}
}
func (pc *persistConn) shouldKeepAlive(req *http.Request, resp *http.Response) bool {
if resp.Close {
return false
}
// HTTP/1.0 + 无 Connection: keep-alive → 关闭
if resp.ProtoMajor == 1 && resp.ProtoMinor == 0 {
v := resp.Header.Get("Connection")
return strings.EqualFold(v, "keep-alive")
}
// HTTP/1.1 默认 keep-alive,除非 Connection: close
return !strings.EqualFold(resp.Header.Get("Connection"), "close")
}
2. conn_pool.go —— 极简版连接池
type connectMethodKey struct {
Scheme string
Addr string // host:port
// 如果有 proxy,还可以加 ProxyURL 等字段
}
type Transport struct {
mu sync.Mutex
// 空闲连接,按 key 分桶
idle map[connectMethodKey][]*persistConn
// 正在使用的连接计数(限流用)
connsInUse map[connectMethodKey]int
// 配置
MaxIdleConns int
MaxIdleConnsPerHost int
MaxConnsPerHost int
IdleConnTimeout time.Duration
DialContext func(ctx context.Context, network, addr string)(net.Conn, error)
TLSClientConfig *tls.Config
}
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cm := connectMethod{...} // 从 req 构建 key(http/https/proxy)
key := cm.key()
// 阶段1:尝试复用空闲连接
pc := t.getIdleConn(key, ctx)
if pc != nil {
resp, err := pc.RoundTrip(req)
if err == nil {
t.putOrClose(pc) // 根据是否可用放回或关闭
return resp, nil
}
// 失败了就关闭这条连接
pc.markDead()
}
// 阶段2:新建连接
conn, err := t.dialNewConn(ctx, cm)
if err != nil {
return nil, err
}
pc = &persistConn{
conn: conn,
reader: bufio.NewReaderSize(conn, 4*1024),
writer: bufio.NewWriterSize(conn, 4*1024),
createdAt: time.Now(),
key: key,
}
// 阶段3:执行请求
resp, err := pc.RoundTrip(req)
if err != nil {
pc.markDead()
return nil, err
}
// 阶段4:放回池子(如果还能复用)
t.putOrClose(pc)
return resp, nil
}
// getIdleConn、putOrClose、dialNewConn 等方法留给大家实现(提示:加锁、清理过期、限流判断)
三、阶段小结与压测建议
至此,我们已经实现了一个结构上非常接近标准库的 RoundTripper:
- 拥有连接池(idle + in-use 计数)
- 支持 keep-alive 判断
- 支持 TLS
- 支持上下文取消
- 具备基础的超时体系(可继续完善)
推荐压测对比维度(使用 hey 或 wrk):
| 指标 |
http.DefaultTransport |
你的 kofer.RoundTripper |
备注 |
| QPS (keep-alive) |
~8-9w |
? |
开启长连接 |
| QPS (每次新建连接) |
~1.2-1.8w |
? |
关闭 keep-alive |
| 99% 延迟 |
|
|
|
| 连接数峰值 |
|
|
MaxIdleConnsPerHost=100 |
| 内存增长 |
|
|
跑 30s 观察 |
尝试实现并运行一下吧。对于网络编程和 HTTP/1.1 的底层原理有更深的理解,欢迎在 Go 相关的技术社区如 云栈社区 分享你的实现心得与压测结果。