找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3641

积分

0

好友

489

主题
发表于 1 小时前 | 查看: 4| 回复: 0

昨天还在为开发效率沾沾自喜,今天就在为生产环境焦头烂额。FastAPI 让你 5 分钟写出 Hello World,但真正让它扛住真实流量,完全是另一回事。

这篇文章来自一线实战,包含可直接复用的代码和经过验证的部署策略。读完你会得到一套完整的“生产环境生存指南”:谁可以进来(认证)、能做多少事(限流)、以及如何在不中断服务的情况下升级系统(零停机部署)

一个卡通小熊形象,头戴黄白相间发带,身穿黄色背心和蓝色短裤,右手举着一个黑黄相间的羽毛球拍,面带微笑。小熊右侧有一个灰色对话气泡框,内含逐渐完整显示的中文文字,从‘大家好,我’到‘大家好,我是云朵君!’,文字为黑色手写风格字体,背景为纯白色。

核心概念类比:就像开一家餐厅

想象你开了一家很火的餐厅:

  • 认证 = 门口保安,只让有会员卡的客人进来
  • 限流 = 厨房产能有限,高峰期只能一桌一桌上菜,不能把所有客人都塞进来
  • 零停机部署 = 餐厅在营业时间更换菜单,但客人完全感觉不到,菜还在继续上

下面我们就按这三个维度,一步步搭起来。

1. 认证:让每一次请求都有“身份证”

JWT(JSON Web Token)是当下最主流的认证方案。但生产环境需要特别注意几个细节。

为什么不能只用 JWT 字符串?

很多新手会犯一个错误:直接在代码里写死一个 JWT 密钥,验证通过就行。这相当于给餐厅配了个只认一张会员卡的保安——哪天换卡了(密钥轮换),所有客人都被拒之门外。

正确的做法是使用 OIDC(OpenID Connect)标准,让服务从认证服务器的 JWKS 端点动态获取公钥。想深入了解 Python 在生产环境中的应用,Python 技术栈提供了丰富的认证库支持。

核心依赖:JWKS 自动更新

# auth.py
from fastapi import Depends, HTTPException, status, Request
from jose import jwt
from httpx import AsyncClient
import time
import asyncio

# 缓存 JWKS 公钥,避免每次请求都去拉取
JWKS_CACHE = {"keys": [], "exp": 0}
ISSUER = "https://accounts.example.com"  # 替换为你的认证服务
AUD = "api"  # 预期的 audience
ALGOS = ["RS256"]  # 只允许 RS256
JWKS_URL = f"{ISSUER}/.well-known/jwks.json"

async def load_jwks():
    """从认证服务器加载公钥,带缓存"""
    global JWKS_CACHE
    if time.time() < JWKS_CACHE["exp"]:
        return JWKS_CACHE["keys"]

    async with AsyncClient(timeout=3) as c:
        resp = await c.get(JWKS_URL)
        resp.raise_for_status()
        data = resp.json()

    # 缓存 10 分钟,避免频繁拉取
    JWKS_CACHE = {"keys": data["keys"], "exp": time.time() + 600}
    return JWKS_CACHE["keys"]

async def current_user(request: Request):
    """依赖注入:从请求中提取并验证当前用户"""
    auth = request.headers.get("authorization", "")

    if not auth.startswith("Bearer "):
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Missing or invalid token format")

    token = auth.split(" ", 1)[1]
    keys = await load_jwks()

    try:
        claims = jwt.decode(
            token,
            keys,
            algorithms=ALGOS,
            audience=AUD,
            issuer=ISSUER
        )
        return {
            "sub": claims["sub"],
            "scopes": claims.get("scope", "").split()
        }
    except Exception as e:
        # ⚠️ 这里不要打印完整 token,只打印错误类型
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, f"Invalid token: {type(e).__name__}")

在路由中使用

from fastapi import FastAPI, Depends
from auth import current_user

app = FastAPI()

@app.get("/me")
async def me(user=Depends(current_user)):
    """获取当前用户信息"""
    return {"user": user["sub"]}

生产级认证检查清单

⚠️ 注意:90% 的人会在这里踩坑

要点 正确做法 常见错误
Token 有效期 Access ≤ 15分钟,Refresh 7-30天 Access 设置 7 天,泄露风险极高
密钥管理 从 JWKS 动态获取 代码里写死一个密钥
吊销处理 Redis 存 jti(JWT ID),TTL 设 token 有效期 不做吊销,用户登出后 token 依然可用
Cookie vs Header 用 HttpOnly Cookie + CSRF token 单纯用 Cookie 不做防护
日志安全 只记录错误类型,不记录 token 日志里打印完整 token,敏感信息泄露

2. 限流:给你的系统装上“减压阀”

没有限流的系统,就像没有刹车的车。一个恶意用户或一个突发流量就能把整个服务拖垮。

为什么选择 Redis + Token Bucket?

Token Bucket(令牌桶)算法最适合 API 场景:

  • 允许短时突发流量(burst)
  • 长期平均速率可控
  • 分布式环境下用 Redis 保证原子性

类比:就像地铁闸机

  • 每个乘客刷卡需要消耗 1 个令牌
  • 闸机每秒自动补充固定数量的令牌(如 50 个/秒)
  • 高峰时段令牌消耗快,但桶里还有存货(允许突发)
  • 令牌用完 → 闸机关闭,提示“请稍后再试”

基于 Redis Lua 的原子性限流

# rate_limit.py
import aioredis
from fastapi import Request, HTTPException, status
import time

# Lua 脚本在 Redis 中原子执行
LUA_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])  -- 桶容量
local refill_rate = tonumber(ARGV[3])  -- 令牌补充速率(个/秒)
local cost = tonumber(ARGV[4])  -- 本次消耗的令牌数

-- 获取当前桶状态
local data = redis.call("HMGET", key, "tokens", "last_update")
local tokens = tonumber(data[1]) or capacity
local last_update = tonumber(data[2]) or now

-- 补充令牌
local delta = (now - last_update) * refill_rate
tokens = math.min(capacity, tokens + delta)

local allowed = 0
if tokens >= cost then
    tokens = tokens - cost
    allowed = 1
end

-- 保存新状态,设置过期时间
redis.call("HMSET", key, "tokens", tokens, "last_update", now)
redis.call("EXPIRE", key, 3600)

return {allowed, tokens}
"""

class RateLimiter:
    def __init__(self, redis: aioredis.Redis, capacity=100, refill_rate=50):
        """
        capacity: 桶容量(最大突发请求数)
        refill_rate: 令牌补充速率(每秒补充多少令牌)
        """
        self.redis = redis
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.script = self.redis.register_script(LUA_SCRIPT)

    async def check(self, key: str, cost=1) -> bool:
        """检查是否允许请求"""
        # 使用 Redis 的时间戳,保证分布式一致性
        redis_time = await self.redis.time()
        now = float(redis_time[0]) + redis_time[1] / 1_000_000

        allowed, tokens = await self.script(
            keys=[f"rl:{key}"],
            args=[now, self.capacity, self.refill_rate, cost]
        )
        return bool(int(allowed))

    async def get_remaining(self, key: str) -> int:
        """获取剩余令牌数"""
        data = await self.redis.hmget(f"rl:{key}", "tokens")
        return int(data[0]) if data[0] else self.capacity

# 全局限流器实例
limiter = None

async def get_limiter():
    global limiter
    if limiter is None:
        redis = await aioredis.from_url(
            "redis://localhost:6379",
            decode_responses=True
        )
        limiter = RateLimiter(redis, capacity=100, refill_rate=50)
    return limiter

async def rate_limit(request: Request):
    """依赖注入:限流中间件"""
    limiter = await get_limiter()

    # 根据场景选择限流维度
    # 认证用户用 user_id,匿名用 IP
    user_id = request.headers.get("x-user-id")
    if user_id:
        key = f"user:{user_id}"
    else:
        key = f"ip:{request.client.host}"

    allowed = await limiter.check(key)
    if not allowed:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Rate limit exceeded. Please slow down.",
            headers={"Retry-After": "30"}  # 告诉客户端 30 秒后重试
        )

在路由中使用

@app.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
    """搜索接口,限制 50 请求/秒,突发 100"""
    # 业务逻辑...
    return {"results": []}

限流策略矩阵

路由类型 限流维度 推荐速率 说明
匿名访问 IP 10-30 req/s 防止爬虫
认证用户 user_id 50-200 req/s 根据业务调整
API 密钥 api_key 100-1000 req/s 合作伙伴可单独配置
登录接口 IP 5 req/min 防止暴力破解
支付接口 user_id 2 req/s 敏感操作低频率

3. 零停机部署:让你的服务永不掉线

“发版就重启”是开发环境的做法。生产环境里,一个重启可能丢掉几十个正在处理的请求。

核心概念:优雅关闭

优雅关闭的意思是:收到终止信号后,不再接收新请求,但继续处理完已有的请求,然后才退出。这背后涉及到深入的 运维/DevOps/SRE 实践。

Uvicorn + Lifespan 最佳实践

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg
import aioredis
import asyncio

@asynccontextmanager
async def lifespan(app: FastAPI):
    """生命周期管理:打开/关闭连接池"""
    # 启动时执行
    app.state.db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=5,
        max_size=20
    )
    app.state.redis = await aioredis.from_url(
        "redis://localhost:6379",
        decode_responses=True
    )

    yield  # 应用运行期间

    # 关闭时执行(优雅关闭时会等待)
    await app.state.db_pool.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

@app.get("/health/live")
async def liveness():
    """存活探针:进程是否还在"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """就绪探针:依赖是否就绪"""
    try:
        # 检查数据库
        async with app.state.db_pool.acquire() as conn:
            await conn.execute("SELECT 1")
        # 检查 Redis
        await app.state.redis.ping()
        return {"status": "ready"}
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Not ready: {e}"
        )

部署配置(Docker + Kubernetes)

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# 多 worker 启动,优雅关闭超时 30 秒
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", \
     "--workers", "4", "--graceful-timeout", "30"]
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1        # 允许多启动 1 个 pod
      maxUnavailable: 0  # 不允许有 pod 不可用
  template:
    spec:
      containers:
      - name: app
        image: fastapi:latest
        ports:
        - containerPort: 8000
        livenessProbe:
          httpGet:
            path: /health/live
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

数据库迁移:Expand/Contract 模式

⚠️ 这是最容易出错的地方:直接修改表结构会锁表,导致请求超时

错误做法

ALTER TABLE users ADD COLUMN role VARCHAR(50) NOT NULL;
-- 此时所有 INSERT 会失败,因为 NOT NULL 约束

正确做法(Expand/Contract)

# 步骤 1: 添加新列(允许 NULL)
"""
Revision: 001_expand_add_role
"""
def upgrade():
    op.add_column('users', sa.Column('role', sa.String(50), nullable=True))

# 步骤 2: 部署应用,应用开始双写(新列和旧列都写)
# 步骤 3: 后台迁移数据
# 步骤 4: 切换到读新列
# 步骤 5: 设置 NOT NULL 约束
def upgrade():
    op.alter_column('users', 'role', nullable=False)

4. 生产级中间件配置

安全头(Security Headers)

from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(HTTPSRedirectMiddleware)  # 强制 HTTPS
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com"])

# 自定义安全头
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
    return response

CORS 精确配置

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com", "https://admin.example.com"],  # 不写 "*"
    allow_credentials=True,  # 使用 Cookie 时必须为 True
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
    expose_headers=["X-Request-ID"],
    max_age=3600,
)

一张卡通风格的动图抽帧图像,展示了一个橙色纸箱,箱体正面印有黄色五角星图案。第一帧仅显示空箱子;后续帧中,一个红发、戴紫色小丑帽、穿紫色上衣的小丑从箱中探出上半身,面带微笑,红鼻子,右手抬起挥手;四帧中小丑姿态略有变化但整体保持在箱内挥手动作,背景为纯黑色。

5. 可观测性:凌晨 2 点的救命稻草

请求 ID 贯穿全链路

from starlette.middleware.base import BaseHTTPMiddleware
import uuid

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        # 优先使用传入的 request-id,否则生成
        request_id = request.headers.get("x-request-id", uuid.uuid4().hex)
        request.state.request_id = request_id

        response = await call_next(request)
        response.headers["x-request-id"] = request_id
        return response

app.add_middleware(RequestIDMiddleware)

结构化日志(JSON 格式)

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "request_id": getattr(record, "request_id", "unknown"),
            "route": getattr(record, "route", "unknown"),
            "duration_ms": getattr(record, "duration_ms", 0),
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# 配置日志
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
logger = logging.getLogger(__name__)

一个真实的案例

某创业公司的 FastAPI 服务支撑着三个区域的移动客户端:

配置

  • 认证:OIDC + JWKS,access token 12 分钟有效期
  • 限流:认证用户 60 req/s,匿名 IP 10 req/s,令牌桶容量 2 倍
  • 部署:滚动更新,每次更新 20% 的实例,等待新实例就绪 20 秒

结果

  • 月度可用性 99.95%
  • P95 延迟稳定在 120-180ms
  • 部署期间的错误率 降为零(之前每次发版都有 5-10 个超时)

你可能会问:这些真的能避免故障吗?两周后的一次 schema 变更,在 staging 环境触发了 500 风暴——但正是用了 Expand/Contract 模式,生产环境完全没有感知。

最终检查清单(可复制到 README)

  • [ ] JWT 通过 JWKS 验证,支持密钥自动轮换
  • [ ] Access token ≤ 15 分钟,Refresh token ≤ 30 天
  • [ ] 登出时用 Redis 存储 jti 黑名单
  • [ ] Redis 令牌桶实现分布式限流
  • [ ] 返回 Retry-After 头,方便客户端退避
  • [ ] 就绪探针检查 DB + Redis
  • [ ] Uvicorn 配置 --graceful-timeout ≥ p99 请求时长
  • [ ] 数据库迁移使用 Expand/Contract 模式
  • [ ] 全链路请求 ID
  • [ ] JSON 结构化日志,包含 duration、status、route
  • [ ] 安全头 + 严格的 CORS 配置
  • [ ] Lifespan 管理连接池生命周期

核心回顾

  1. 认证不是简单的 JWT 解码,要用 JWKS + 短时 token + 黑名单
  2. 限流是系统的最后一道防线,Redis + 令牌桶是生产标配
  3. 零停机不是玄学,是优雅关闭 + 就绪探针 + 渐进式迁移的组合拳

FastAPI 让开发变得很快,但生产环境需要的是“可预测的快”。当你把认证、限流、部署这三个环节打磨好,发版就不再需要祈祷,凌晨的电话也会越来越少。

好的基础设施,是让团队把精力花在业务上,而不是救火上。如果你在生产落地中遇到其他坑,不妨去 云栈社区 的论坛看看,那里有很多一线开发者在交流实战经验。

你的生产环境用的是什么部署策略?滚动更新、蓝绿部署还是金丝雀?欢迎在评论区分享你的配置,一起聊聊哪些坑值得提前避开。




上一篇:DeepSeek V4发布:百万上下文与万亿参数推理深度解析
下一篇:告别手写 os.getenv:pydantic-settings 配置管理实战指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-10 03:28 , Processed in 0.826620 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表