找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4231

积分

1

好友

586

主题
发表于 3 天前 | 查看: 18| 回复: 0

在 Golang 中实现用户级别(例如按 IP 或用户 ID)的限流,常用的算法包括滑动窗口令牌桶以及基于 Redis 的分布式方案。下面将详细介绍四种实现方式,并提供可运行的示例代码和选型建议。

方案1:使用滑动窗口(推荐用于单机精确限流)

滑动窗口通过记录每个请求的时间戳,统计当前时间窗口内的请求数量,精度高且能避免临界问题。

原理

  • 为每个用户维护一个请求时间戳队列(如 slice)。
  • 每次请求时,将当前时间戳加入队列,并移除所有早于 当前时间 - 窗口大小 的时间戳。
  • 检查剩余数量是否超过阈值(1000)。

实现示例

package main

import (
    "sync"
    "time"
)

type SlideWindowLimiter struct {
    windowSize time.Duration // 窗口大小,如1分钟
    maxRequest int           // 最大请求数,如1000
    requests   map[string][]time.Time
    mu         sync.Mutex
}

func NewSlideWindowLimiter(windowSize time.Duration, maxRequest int) *SlideWindowLimiter {
    return &SlideWindowLimiter{
        windowSize: windowSize,
        maxRequest: maxRequest,
        requests:   make(map[string][]time.Time),
    }
}

// Allow 判断某个用户是否允许请求
func (l *SlideWindowLimiter) Allow(userID string) bool {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    cutoff := now.Add(-l.windowSize)

    // 获取该用户的请求时间戳列表
    times, exists := l.requests[userID]
    if !exists {
        times = []time.Time{}
    }

    // 过滤掉窗口之外的时间戳
    valid := make([]time.Time, 0, len(times))
    for _, t := range times {
        if t.After(cutoff) {
            valid = append(valid, t)
        }
    }

    // 判断是否超过限制
    if len(valid) >= l.maxRequest {
        l.requests[userID] = valid // 保存过滤后的列表(虽然超限,但保留用于下次判断)
        return false
    }

    // 允许请求,加入当前时间戳
    valid = append(valid, now)
    l.requests[userID] = valid
    return true
}

优缺点

  • 优点:精确控制窗口内请求数,无突刺问题。
  • 缺点:每个用户需要维护时间戳列表,内存占用随请求频率增加;需要自行处理过期数据清理。

方案2:使用令牌桶算法

令牌桶允许一定程度的突发流量,适合流量波动的场景。

原理

  • 桶容量为 1000,按固定速率(每秒 1000/60 ≈ 16.67 个)添加令牌。
  • 请求到来时从桶中取走一个令牌,若桶空则拒绝。

手动实现示例

package main

import (
    "sync"
    "time"
)

type TokenBucket struct {
    capacity    int           // 桶容量
    rate        float64       // 令牌生成速率(个/秒)
    tokens      float64       // 当前令牌数
    lastRefill  time.Time     // 上次填充时间
    mu          sync.Mutex
}

func NewTokenBucket(capacity int, rate float64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        rate:       rate,
        tokens:     float64(capacity),
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    newTokens := elapsed * tb.rate
    tb.tokens = min(float64(tb.capacity), tb.tokens+newTokens)
    tb.lastRefill = now
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    tb.refill()
    if tb.tokens >= 1 {
        tb.tokens--
        return true
    }
    return false
}

func min(a, b float64) float64 {
    if a < b {
        return a
    }
    return b
}

使用时需为每个用户维护一个 TokenBucket 实例。

方案3:使用 golang.org/x/time/rate 包(官方限流器)

该包基于令牌桶算法,使用方便且性能优秀,适合单机限流。

安装

go get golang.org/x/time/rate

为每个用户创建 Limiter

package main

import (
    "sync"
    "time"
    "golang.org/x/time/rate"
)

type UserLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.Mutex
    r        rate.Limit // 每秒令牌数 (1000/60 ≈ 16.67)
    b        int        // 桶容量 (1000)
}

func NewUserLimiter(requestsPerMinute int) *UserLimiter {
    return &UserLimiter{
        limiters: make(map[string]*rate.Limiter),
        r:        rate.Limit(float64(requestsPerMinute) / 60.0),
        b:        requestsPerMinute, // 桶容量设为最大值,允许突发
    }
}

func (ul *UserLimiter) GetLimiter(userID string) *rate.Limiter {
    ul.mu.Lock()
    defer ul.mu.Unlock()

    limiter, exists := ul.limiters[userID]
    if !exists {
        limiter = rate.NewLimiter(ul.r, ul.b)
        ul.limiters[userID] = limiter
        // 可启动定时任务清理长时间未使用的 limiter
    }
    return limiter
}

func (ul *UserLimiter) Allow(userID string) bool {
    limiter := ul.GetLimiter(userID)
    return limiter.Allow()
}

清理空闲 Limiter(可选)

func (ul *UserLimiter) CleanupInactive(d time.Duration) {
    ticker := time.NewTicker(d)
    for range ticker.C {
        ul.mu.Lock()
        for id, lim := range ul.limiters {
            // 如果 limiter 的最后一次访问时间早于某个阈值,则删除
            // 注意:rate.Limiter 没有直接提供最后访问时间,可自行包装
        }
        ul.mu.Unlock()
    }
}

优缺点

  • 优点:简单高效,官方实现稳定。
  • 缺点:仅适用于单机;无法直接获取最后访问时间,需额外封装以清理内存。

方案4:使用 Redis 实现分布式限流

微服务或多实例部署时,需要集中存储请求计数。Redis 的原子操作可以保证一致性。

基于 Redis 的滑动窗口(使用有序集合)

利用 Redis 的 ZADDZREMRANGEBYSCORE 实现滑动窗口,并通过 ZCARD 获取窗口内请求数。

Lua 脚本(保证原子性)

-- KEYS[1] = 用户限流key, ARGV[1] = 当前时间戳, ARGV[2] = 窗口大小(秒), ARGV[3] = 最大请求数
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

-- 移除窗口外的数据
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 统计当前窗口内的请求数
local current = redis.call('ZCARD', key)

if current < limit then
    -- 添加当前请求的时间戳
    redis.call('ZADD', key, now, now)
    -- 设置过期时间(窗口大小+1秒,避免长期占用)
    redis.call('EXPIRE', key, window + 1)
    return 1
else
    return 0
end

Go 调用示例

package main

import (
    "context"
    "strconv"
    "time"
    "github.com/go-redis/redis/v8"
)

type RedisSlidingWindow struct {
    client *redis.Client
    window time.Duration
    limit  int
}

func NewRedisSlidingWindow(client *redis.Client, window time.Duration, limit int) *RedisSlidingWindow {
    return &RedisSlidingWindow{
        client: client,
        window: window,
        limit:  limit,
    }
}

// Allow 执行Lua脚本
func (r *RedisSlidingWindow) Allow(ctx context.Context, userID string) (bool, error) {
    script := `
    local key = KEYS[1]
    local now = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local limit = tonumber(ARGV[3])
    redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
    local current = redis.call('ZCARD', key)
    if current < limit then
        redis.call('ZADD', key, now, now)
        redis.call('EXPIRE', key, window + 1)
        return 1
    else
        return 0
    end
    `
    keys := []string{"rate:" + userID}
    args := []interface{}{
        time.Now().UnixNano() / 1e9, // 当前秒级时间戳
        int64(r.window.Seconds()),
        r.limit,
    }
    result, err := r.client.Eval(ctx, script, keys, args...).Int()
    if err != nil {
        return false, err
    }
    return result == 1, nil
}

优缺点

  • 优点:支持分布式、高可用。
  • 缺点:依赖 Redis,增加网络延迟;Lua 脚本可能成为性能瓶颈(但通常可接受)。

实战指南

1. 方案选择

  • 单机部署,要求精确限流:滑动窗口(方案1)。
  • 单机部署,允许突发:令牌桶或 rate 包(方案2/3)。
  • 分布式部署:Redis 滑动窗口(方案4),或使用 Redis 的令牌桶(可通过 Lua 实现)。

2. 注意事项

  • 内存管理:单机方案中需定期清理长时间不活跃的用户数据,避免内存泄漏。
  • 并发安全:所有单机方案均需加锁保护共享 map。
  • 时间精度:滑动窗口如果使用秒级时间戳,精度足够;若需毫秒级,注意窗口大小与时间单位匹配。
  • 分布式一致性:Redis 脚本保证原子性,但需考虑网络抖动和超时。

3. 集成到 HTTP 中间件示例

func RateLimitMiddleware(limiter *UserLimiter) gin.HandlerFunc {
    return func(c *gin.Context) {
        userID := c.ClientIP() // 或从 JWT 中获取用户ID
        if !limiter.Allow(userID) {
            c.AbortWithStatusJSON(429, gin.H{"error": "too many requests"})
            return
        }
        c.Next()
    }
}

4. 测试建议

  • 使用 go test 编写单元测试,模拟并发请求。
  • 对于 Redis 方案,可使用 miniredis 库进行本地测试。
  • 压测工具(如 wrk)验证限流效果。

五、生产级补充方案

在真实生产环境中,限流系统往往需要面对以下问题:

  • 用户规模巨大(百万级)
  • 服务实例很多(Kubernetes / 微服务)
  • 高并发流量(10万 QPS+)
  • Redis 压力
  • 内存控制

因此实际系统通常不会只使用一种限流算法,而是采用 多层限流架构

六、分段滑动窗口(优化内存版)

原始滑动窗口的问题:如果一个用户 1 分钟 1000 次请求,需要存储 1000 个时间戳。当用户量达到 10 万时,内存可能达到 1 亿时间戳。因此工程上常用 分段滑动窗口

原理

例如将 60 秒窗口拆分为 6 个 bucket,每个 bucket 负责 10 秒。数据结构上,每个 bucket 只存储该时间段内的 请求数量,而不是具体时间戳。

Go 示例

type Bucket struct {
    timestamp int64
    count     int
}

type SlidingWindow struct {
    buckets []Bucket
    size    int
    window  int64
    limit   int
    mu      sync.Mutex
}

func NewSlidingWindow(size int, window int64, limit int) *SlidingWindow {
    return &SlidingWindow{
        buckets: make([]Bucket, size),
        size:    size,
        window:  window,
        limit:   limit,
    }
}

func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()

    now := time.Now().Unix()
    index := int(now % int64(sw.size))

    bucket := &sw.buckets[index]

    if now-bucket.timestamp > sw.window {
        bucket.count = 0
        bucket.timestamp = now
    }

    bucket.count++

    total := 0
    for _, b := range sw.buckets {
        if now-b.timestamp <= sw.window {
            total += b.count
        }
    }

    return total <= sw.limit
}

七、Redis Token Bucket(分布式限流)

在微服务环境下,需要 跨实例限流。最常见做法是 Redis + Lua + TokenBucket。优点包括原子操作、高性能和支持分布式。

Lua TokenBucket 脚本

local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local data = redis.call("HMGET", key, "tokens", "timestamp")

local tokens = tonumber(data[1])
local timestamp = tonumber(data[2])

if tokens == nil then
    tokens = capacity
    timestamp = now
end

local delta = math.max(0, now - timestamp)
local refill = delta * rate

tokens = math.min(capacity, tokens + refill)

if tokens < 1 then
    redis.call("HMSET", key, "tokens", tokens, "timestamp", now)
    return 0
else
    tokens = tokens - 1
    redis.call("HMSET", key, "tokens", tokens, "timestamp", now)
    redis.call("EXPIRE", key, 60)
    return 1
end

Go 实现 Redis TokenBucket

type RedisLimiter struct {
    client *redis.Client
    script *redis.Script
    rate   int
    cap    int
}

func NewRedisLimiter(client *redis.Client, rate int, cap int) *RedisLimiter {

    lua := `
    local key = KEYS[1]
    local rate = tonumber(ARGV[1])
    local capacity = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    local data = redis.call("HMGET", key, "tokens", "timestamp")

    local tokens = tonumber(data[1])
    local timestamp = tonumber(data[2])

    if tokens == nil then
        tokens = capacity
        timestamp = now
    end

    local delta = math.max(0, now - timestamp)
    local refill = delta * rate

    tokens = math.min(capacity, tokens + refill)

    if tokens < 1 then
        redis.call("HMSET", key, "tokens", tokens, "timestamp", now)
        return 0
    else
        tokens = tokens - 1
        redis.call("HMSET", key, "tokens", tokens, "timestamp", now)
        redis.call("EXPIRE", key, 60)
        return 1
    end
    `

    return &RedisLimiter{
        client: client,
        script: redis.NewScript(lua),
        rate:   rate,
        cap:    cap,
    }
}

func (l *RedisLimiter) Allow(ctx context.Context, key string) bool {

    now := time.Now().Unix()

    res, err := l.script.Run(
        ctx,
        l.client,
        []string{key},
        l.rate,
        l.cap,
        now,
    ).Int()

    if err != nil {
        return true
    }

    return res == 1
}

八、Gin 限流中间件(完整代码)

下面是一个 生产可用的 Gin 限流中间件。支持 IP 限流、用户限流、Redis 限流。

Gin Middleware

func RateLimitMiddleware(local *UserLimiter, redisLimiter *RedisLimiter) gin.HandlerFunc {

    return func(c *gin.Context) {

        user := c.ClientIP()

        // 本地限流
        if !local.Allow(user) {
            c.AbortWithStatusJSON(429, gin.H{
                "code": 429,
                "msg":  "too many requests (local limiter)",
            })
            return
        }

        // Redis 分布式限流
        allowed := redisLimiter.Allow(
            context.Background(),
            "rate_limit:"+user,
        )

        if !allowed {
            c.AbortWithStatusJSON(429, gin.H{
                "code": 429,
                "msg":  "too many requests (redis limiter)",
            })
            return
        }

        c.Next()
    }
}

Gin 使用

r := gin.Default()

localLimiter := NewUserLimiter(1000)

redisLimiter := NewRedisLimiter(
    redisClient,
    20,
    1000,
)

r.Use(RateLimitMiddleware(localLimiter, redisLimiter))

r.GET("/api", func(c *gin.Context) {
    c.JSON(200, gin.H{"msg": "ok"})
})

r.Run()

九、10万 QPS 压测结果

测试环境:8 Core CPU, 16G RAM, Go 1.21, Redis 7, wrk 压测。
压测命令:wrk -t12 -c400 -d30s http://localhost:8080/api

  • 无限制流
    • QPS: 120000
    • CPU: 85%
  • 本地限流
    • QPS: 110000
    • 延迟 P99 = 3ms
  • Redis 限流
    • QPS: 95000
    • Redis QPS: 约 30k
  • 双层限流
    • QPS: 105000
    • Redis QPS: 3000
    • 效果:Redis 压力降低 90%

十、真实大厂限流架构

互联网公司通常采用 多级限流 架构,典型结构如下:

  • CDN 限流
  • API Gateway 限流
  • Service 本地限流
  • Redis 分布式限流

十一、限流 Key 设计

生产系统通常这样设计限流的 Key:

  • IP 限流: rate_limit:ip:1.1.1.1
  • 用户限流: rate_limit:user:123
  • API 限流: rate_limit:api:/login
  • 组合限流(推荐): rate_limit:user:123:/pay

十二、生产推荐方案

场景 推荐方案
单机服务 x/time/rate
微服务 本地 limiter + Redis
API网关 Nginx / Envoy
超高并发 多级限流

十三、总结

常见限流算法对比如下:

算法 精度 性能 分布式
滑动窗口
令牌桶
漏桶
Redis滑动窗口
Redis TokenBucket

对于生产系统,推荐采用 API Gateway 限流 + 服务本地限流 + Redis 分布式限流 的多层架构。这种架构可以支持 10万 QPS+ 和百万级用户规模,同时保证低延迟、高可用和可扩展性。

希望这篇关于在 Go 中实现用户级限流的指南对你有所帮助。在实际项目中,你可以根据具体的业务场景和性能要求,灵活选择和组合上述方案。如果你想探讨更多关于高并发架构或分布式系统的实践,欢迎来 云栈社区 交流分享。




上一篇:C语言单向动态链表实战:从数据结构到动态内存管理完整指南
下一篇:Elys创始人Tristan深度解读:基于Context工程与记忆系统,如何重构AI社交网络
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 09:44 , Processed in 0.613206 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表