找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2401

积分

0

好友

345

主题
发表于 昨天 02:29 | 查看: 5| 回复: 0

在实际业务中,根据 tracking_id 追查日志中一条请求的完整处理路径是一个比较常见的需求。不过,FastAPI 官方并未提供此功能,需要开发者自行实现。本文将介绍如何基于 Python 标准库中的 contextvars,为每一次请求的完整处理流程生成并绑定一个唯一的 tracking_id,并自动记录在日志中。

什么是 contextvars?

Python 在 3.7 版本的标准库中引入了 contextvars 模块,意为“上下文变量”。它通常用于隐式传递环境信息,其作用与 threading.local() 类似。但 threading.local() 是针对线程的数据隔离,而 contextvars 则能完美适配 asyncio 生态的异步协程。

重要提示: contextvars 不仅可用于异步协程,也可以替代 threading.local() 用于多线程函数中。

基础实现步骤

下面我们将分步构建一个完整的 Tracking ID 解决方案。

1. 定义上下文变量

首先,我们创建一个 context.py 文件来定义全局的上下文变量。

import contextvars
from typing import Optional

TRACKING_ID: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
    'tracking_id',
    default=None
)

def get_tracking_id() -> Optional[str]:
    """用于依赖注入"""
    return TRACKING_ID.get()

2. 编写中间件

接着,创建一个中间件 (middlewares.py),它在请求到达时生成 tracking_id,并设置在请求头和响应头中。这在客户需要根据此 ID 定位问题时非常有用。

import uuid

from starlette.middleware.base import (BaseHTTPMiddleware,
                                       RequestResponseEndpoint)
from starlette.requests import Request
from starlette.responses import Response

from context import TRACKING_ID

class TrackingIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        tracking_id = str(uuid.uuid4())
        token = TRACKING_ID.set(tracking_id)
        # HTTP 请求头习惯于使用 latin-1 编码
        request.scope["headers"].append((b"x-request-id", tracking_id.encode("latin-1")))

        try:
            resp = await call_next(request)
        finally:
            # 无论是否成功,每次请求结束时重置 tracking_id,避免泄露到下一次的请求中
            TRACKING_ID.reset(token)

        # 可选,在响应中设置跟踪 ID 头
        resp.headers["X-Tracking-ID"] = tracking_id

        return resp

3. 创建 Handler 函数进行测试

为了验证 tracking_id 能否在业务函数中正确获取,我们编写一个模拟的异步数据库查询函数 (handlers.py)。

import asyncio

from context import TRACKING_ID

async def mock_db_query():
    await asyncio.sleep(1)
    current_id = TRACKING_ID.get()
    print(f"This is mock_db_query. Current tracking ID: {current_id}")
    await asyncio.sleep(1)

4. 集成到 FastAPI 主应用

最后,在主程序 main.py 中集成中间件,并创建两个测试端点。

import uvicorn
from fastapi import Depends, FastAPI
from fastapi.responses import PlainTextResponse

from context import TRACKING_ID, get_tracking_id
from handlers import mock_db_query
from middlewares import TrackingIDMiddleware

app = FastAPI()

app.add_middleware(TrackingIDMiddleware)

@app.get("/qwer")
async def get_qwer():
    """测试上下文变量传递"""
    current_id = TRACKING_ID.get()
    print(f"This is get qwer. Current tracking ID: {current_id}")
    return PlainTextResponse(f"Current tracking ID: {current_id}")

@app.get("/asdf")
async def get_asdf(tracking_id: str = Depends(get_tracking_id)):
    """测试依赖注入"""
    print(f"This is get asdf. tracking ID: {tracking_id}")
    await mock_db_query()
    return PlainTextResponse(f"Get request, tracking ID: {tracking_id}")

if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=8000, workers=4)

5. 测试结果

启动服务后,使用 curl 命令测试 API,在控制台和响应头中都能看到一致的 tracking_id

服务端控制台输出示例:

This is get qwer. Current tracking ID: 01b0153f-4877-4ca0-ac35-ed88ab406452
INFO:     127.0.0.1:55708 - "GET /qwer HTTP/1.1" 200 OK
This is get asdf. tracking ID: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639
This is mock_db_query. Current tracking ID: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639
INFO:     127.0.0.1:55722 - "GET /asdf HTTP/1.1" 200 OK

curl 命令输出示例:

$ curl -i http://127.0.0.1:8000/asdf
HTTP/1.1 200 OK
date: Sat, 29 Nov 2025 06:16:49 GMT
server: uvicorn
content-length: 62
content-type: text/plain; charset=utf-8
x-tracking-id: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639

Get request, tracking ID: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639

处理后台任务型 API

FastAPI 的 BackgroundTasks 允许接口快速响应,而将耗时逻辑放在后台异步处理。由于中间件在响应时会重置 tracking_id,后台协程可能无法获取到它。虽然在本地低并发测试中可能正常,但在生产高并发环境下,建议显式传递上下文变量以确保可靠性。

1. 添加后台任务 API

main.py 中添加一个新的端点。

from starlette.background import BackgroundTasks
from handlers import mock_backgroud_task

@app.get("/zxcv")
async def get_zxcv(tasks: BackgroundTasks):
    """测试后台任务"""
    current_id = TRACKING_ID.get()
    print(f"This is get zxcv. The current id is {current_id}")

    # 显式传递 tracking_id
    tasks.add_task(mock_backgroud_task, current_id)

    return PlainTextResponse(f"This is get zxcv. The current id is {current_id}")

2. 改造后台任务函数

修改 handlers.py 中的后台任务函数,使其显式接收并重新设置 tracking_id

from uuid import uuid4
async def mock_backgroud_task(request_tracking_id: Optional[str]):
    if request_tracking_id is None:
        request_tracking_id = str(uuid4())
        print(f"WARNING: No tracking ID found. Generate a new one: {request_tracking_id}")
    token = TRACKING_ID.set(request_tracking_id)
    try:
        # 模拟耗时的后台异步任务
        await asyncio.sleep(5)
        print(f"This is mock backgroud task. Current tracking ID: {request_tracking_id}")
    finally:
        # 确保 tracking ID 被重置
        TRACKING_ID.reset(token)

集成日志自动记录 Tracking ID

前面我们手动将 tracking_id 添加进 print() 语句,这种方式侵入性强且容易遗漏。更优的方案是让日志记录器自动获取并记录 tracking_id

如果你的应用运行在 Docker 或 K8s 上,通常有控制台日志采集工具,此时日志模块只需输出到标准输出即可。若运行在传统服务器上,使用如 filebeat 等工具采集文件日志,则需要解决日志文件体积增长多进程写文件竞争高并发写日志阻塞协程等问题。

以下日志模块示例解决了上述问题,主要特点包括:

  • 自动获取 tracking_id,零侵入。
  • 输出为 JSON 格式,便于接入 ELK Stack 等日志聚合系统进行分析。
  • 可同时输出到控制台和文件。
  • 日志文件按天轮转,默认保留最近7天。
  • 主进程与工作进程写入独立日志文件,避免竞争。
  • 通过队列异步处理日志,避免文件 I/O 阻塞异步协程。

1. 核心日志模块

编辑文件 pkg/log/log.py

import json
import logging
import os
import re
import sys
from logging.handlers import QueueHandler, QueueListener, TimedRotatingFileHandler
from multiprocessing import current_process
from pathlib import Path
from queue import Queue
from typing import Optional

from context import TRACKING_ID

_queue_listener = None
_queue_logger: Optional[Queue] = None
PATTERN_PROCESS_NAME = re.compile(r"SpawnProcess-(\d+)")

class JSONFormatter(logging.Formatter):
    """A logging formatter that outputs logs in JSON format."""
    def format(self, record: logging.LogRecord) -> str:
        log_record = {
            "@timestamp": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"),
            "level": record.levelname,
            "name": record.name,
            # "taskName": getattr(record, "taskName", None),  # Record task name if needed
            "processName": record.processName,  # Record process name if needed
            "tracking_id": getattr(record, "tracking_id", None),
            "loc": "%s:%d" % (record.filename, record.lineno),
            "func": record.funcName,
            "message": record.getMessage(),
        }
        return json.dumps(log_record, ensure_ascii=False, default=str)

class TrackingIDFilter(logging.Filter):
    """A logging filter that adds tracking_id to log records.
    """
    def filter(self, record):
        record.tracking_id = TRACKING_ID.get()
        return True

def _setup_console_handler(level: int) -> logging.StreamHandler:
    """Setup a StreamHandler for console logging.

    Args:
        level (int): The logging level.
    """
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(JSONFormatter())
    return handler

def _setup_file_handler(
    level: int, log_path: str, rotate_days: int
) -> TimedRotatingFileHandler:
    """Setup a TimedRotatingFileHandler for logging.

    Args:
        level (int): The logging level.
        log_path (str): The log path.
        rotate_days (int): The number of days to keep log files.
    """
    handler = TimedRotatingFileHandler(
        filename=log_path,
        when="midnight",
        interval=1,
        backupCount=rotate_days,
        encoding="utf-8",
    )
    handler.setLevel(level)
    handler.setFormatter(JSONFormatter())
    return handler

def _setup_queue_handler(level: int, log_queue: Queue) -> QueueHandler:
    """Setup a QueueHandler for logging.

    Args:
        level (int): The logging level.
        log_queue (Queue): The log queue.
    """
    handler = QueueHandler(log_queue)
    handler.setLevel(level)
    return handler

def _get_spawn_process_number(name: str) -> str:
    """
    Get the spawn process number for log file naming.
    The server should be started with multiple processes using uvicorn's --workers option.
    Prevent issues caused by multiple processes writing to the same log file.

    Args:
        name (str): The name of the log file.

    Returns:
        str: The spawn process number for log file naming.
    """
    try:
        process_name = current_process().name
        pid = os.getpid()

        if process_name == "MainProcess":
            return name
        elif m := PATTERN_PROCESS_NAME.match(process_name):
            return f"{name}-sp{m.group(1)}"
        else:
            return f"{name}-{pid}"

    except:
        return f"{name}-{os.getpid()}"

def _setup_logpath(log_dir: str, name: str) -> str:
    """Setup the log path.

    Args:
        log_dir (str): The log directory.
        name (str): The name of the log file. Example: "app"

    Returns:
        str: The log path.
    """
    main_name = _get_spawn_process_number(name)
    log_file = f"{main_name}.log"
    log_path = Path(log_dir) / log_file

    if not log_path.parent.exists():
        try:
            log_path.parent.mkdir(parents=True, exist_ok=True)
        except Exception as e:
            raise RuntimeError(
                f"Failed to create log directory: {log_path.parent}"
            ) from e
    return str(log_path)

def _validate(level: int, enable_console: bool, enable_file: bool, rotate_days: int) -> None:
    """Validate the log configuration.

    Args:
        level (int): The logging level.
        enable_console (bool): Whether to enable console logging.
        enable_file (bool): Whether to enable file logging.
        rotate_days (int): The number of days to keep log files.

    Raises:
        ValueError: If the log configuration is invalid.
    """
    if not enable_console and not enable_file:
        raise ValueError("At least one of enable_console or enable_file must be True.")

    if level not in [
        logging.DEBUG,
        logging.INFO,
        logging.WARNING,
        logging.ERROR,
        logging.CRITICAL,
    ]:
        raise ValueError("Invalid logging level specified.")

    if not isinstance(rotate_days, int) or rotate_days <= 0:
        raise ValueError("rotate_days must be a positive integer.")

def setup_logger(
    name: str = "app",
    level: int = logging.DEBUG,
    enable_console: bool = True,
    enable_file: bool = True,
    log_dir: str = "logs",
    rotate_days: int = 7,
) -> logging.Logger:
    """Setup a logger with console and/or file handlers.

    Args:
        name (str): The name of the logger. This will be used as the log file name prefix. Defaults to "app".
        level (int): The logging level. Defaults to logging.DEBUG.
        enable_console (bool): Whether to enable console logging. Defaults to True.
        enable_file (bool): Whether to enable file logging. Defaults to True.
        log_dir (str): The log directory. Defaults to "logs".
        rotate_days (int): The number of days to keep log files. Defaults to 7.

    Returns:
        logging.Logger: The configured logger.
    """
    logger = logging.getLogger(name)

    if logger.hasHandlers():
        return logger  # Logger is already configured

    _validate(level, enable_console, enable_file, rotate_days)

    logger.setLevel(level)
    logger.propagate = False  # Prevent log messages from being propagated to the root logger

    log_path = _setup_logpath(log_dir, name)

    handlers = []

    if enable_console:
        handlers.append(_setup_console_handler(level))

    if enable_file:
        handlers.append(_setup_file_handler(level, log_path, rotate_days))

    global _queue_logger, _queue_listener
    if not _queue_logger:
        _queue_logger = Queue(-1)

    queue_handler = _setup_queue_handler(level, _queue_logger)

    if not _queue_listener:
        _queue_listener = QueueListener(
            _queue_logger, *handlers, respect_handler_level=True
        )
        _queue_listener.start()

    logger.addHandler(queue_handler)
    logger.addFilter(TrackingIDFilter())

    return logger

def close_log_queue() -> None:
    """Close the log queue and stop the listener.
    This function should be called when the application is shutting down to ensure that the log queue is closed and the listener is stopped.
    """
    global _queue_listener
    if _queue_listener:
        _queue_listener.stop()
        _queue_listener = None

2. 模块初始化文件

编辑 pkg/log/__init__.py,方便其他模块导入。这里导出了一个配置好的单例 logger

from pathlib import Path

from .log import close_log_queue, setup_logger

logger = setup_logger(
    log_dir=str(Path(__file__).parent.parent.parent / "logs"),
)

__all__ = [
    "logger",
    "setup_logger",
    "close_log_queue",
]

3. 集成到 FastAPI 生命周期

通过 FastAPI 的 lifespan 事件,在应用关闭时安全地关闭日志队列。

from contextlib import asynccontextmanager
from pkg.log import close_log_queue

@asynccontextmanager
async def lifespan(app: FastAPI):
    try:
        yield
    finally:
        close_log_queue()
        print("Shutdown!")

app = FastAPI(lifespan=lifespan)

4. 使用日志记录器替换 print 语句

将业务代码中的 print() 改为 logger.info()

from pkg.log import logger

async def mock_db_query():
    await asyncio.sleep(1)
    current_id = TRACKING_ID.get()
    logger.info(f"This is mock_db_query. Current tracking ID: {current_id}")
    await asyncio.sleep(1)

5. 测试日志输出

启动服务并发起请求,可以看到控制台和日志文件中都自动包含了 tracking_id 字段。

控制台输出 (JSON格式):

{"@timestamp": "2025-11-29T21:07:09+0800", "level": "INFO", "name": "app", "processName": "SpawnProcess-3", "tracking_id": "38a015f7-b0d3-41ea-a2b3-179bf608b4bb", "loc": "main.py:39", "func": "get_asdf", "message": "This is get asdf. tracking ID: 38a015f7-b0d3-41ea-a2b3-179bf608b4bb"}

补充:另一种解决多进程文件写入的思路

在完成上述复杂的日志模块后,想到了另一种更简洁的思路:模仿 Docker/K8s 环境的方案,服务只输出到标准输出(stdout),由外部工具负责日志的收集和轮转。

  1. 简化日志模块:移除所有文件处理逻辑,只保留控制台输出和队列处理器。
  2. 启动时重定向:使用 nohup 或类似工具将进程的标准输出重定向到文件。
    nohup python main.py > logs/app.log 2>&1 &
  3. 外部日志轮转:使用系统的 logrotate 工具来管理日志文件的轮转和清理。

这种方法将日志管理的职责从应用程序中剥离,让应用更专注于业务逻辑,符合单一职责原则。对于使用 Python 进行 Web 开发的团队来说,掌握这种请求链路追踪的技术,能极大提升线上问题排查的效率。如果你想了解更多关于后端架构或具体技术的实践,可以访问云栈社区与其他开发者交流探讨。




上一篇:Anthropic第四份经济指数:复杂任务AI加速更显著,全球生产力影响不均
下一篇:Claude Code记忆插件claude-mem详解:提升AI编程助手长期记忆
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-18 21:32 , Processed in 0.427714 second(s), 38 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表