找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1531

积分

0

好友

225

主题
发表于 3 天前 | 查看: 9| 回复: 0

许多开发者在基于FastAPI构建项目时,通常会优先考虑使用ORM框架,例如SQLModel或SQLAlchemy。然而,在某些特定的业务场景下,完整的ORM可能并非最佳选择。例如,当需要直接复用DBA编写的复杂SQL、希望对查询执行计划与锁进行精细控制,或者项目规模较小希望减少框架引入的复杂度时,直接操作数据库反而更加高效灵活。

那么,在FastAPI框架中,如果不借助SQLModel等ORM,如何实现一种既专业又稳定的MySQL数据库直连方案呢?答案是利用连接池配合原生SQL。本文将详细介绍如何使用pymysql驱动与DBUtils连接池,在FastAPI中构建一个适用于生产环境的数据库访问层。

为什么需要连接池?

首先需要明确:不使用ORM不等于应该直接裸连数据库。在接口中频繁创建和关闭数据库连接是一种低效且危险的做法:

  • 性能开销大:每个请求都需完成TCP握手、身份认证等环节。
  • 连接易泄漏:复杂的代码分支可能导致连接未正确关闭。
  • 资源不可控:高并发下容易耗尽数据库的最大连接数。

因此,结合“手写SQL的灵活性”与“生产环境的稳定性”的最佳实践是:连接池 + 原生SQL。我们选择成熟的pymysql作为驱动,并使用专为数据库连接池设计的DBUtils库。

方案设计概览

我们的目标是实现以下流程:

  1. 应用启动时,初始化一个MySQL连接池。
  2. 请求到来时,从池中借出一个连接。
  3. 业务代码执行SQL后,将连接归还至池中。
  4. 应用关闭时,安全释放所有连接资源。

这类似于一个停车场管理系统:连接池是停车场,每个请求是一辆车,连接则是车位。系统负责车位的分配、回收与总量控制。

环境准备

安装必要的依赖包:

pip install fastapi uvicorn pymysql DBUtils

封装DBUtils连接池

我们创建一个独立的模块(如db.py)来封装连接池的创建与管理逻辑。

# db.py
from contextlib import contextmanager
from typing import Generator
import pymysql
from DBUtils.PooledDB import PooledDB
from fastapi import FastAPI

class MySQLPool:
    """基于 DBUtils 的 MySQL 连接池封装。"""
    def __init__(
        self,
        host: str,
        port: int,
        user: str,
        password: str,
        database: str,
        min_cached: int = 1,
        max_cached: int = 5,
        max_connections: int = 10,
        charset: str = "utf8mb4",
    ) -> None:
        self._pool: PooledDB | None = None
        self._pool_config = {
            "creator": pymysql,  # 使用 pymysql 作为驱动
            "host": host,
            "port": port,
            "user": user,
            "password": password,
            "database": database,
            "charset": charset,
            "mincached": min_cached,  # 启动时创建的空闲连接数量
            "maxcached": max_cached,  # 连接池中最多缓存的连接数量
            "maxconnections": max_connections,  # 最大连接数
            "blocking": True,  # 连接数耗尽时是否阻塞等待
            "ping": 1,  # 检查连接是否可用
        }

    def init_pool(self) -> None:
        """在应用启动时创建连接池。"""
        if self._pool is None:
            self._pool = PooledDB(**self._pool_config)

    def close_pool(self) -> None:
        """在应用关闭时清理连接池(如果需要)。"""
        # DBUtils连接池会自动管理底层连接,此处仅释放引用
        self._pool = None

    @contextmanager
    def connection(self) -> Generator[pymysql.connections.Connection, None, None]:
        """从连接池获取一个连接,使用 with 语法自动归还。"""
        if self._pool is None:
            raise RuntimeError("Connection pool is not initialized")
        conn = self._pool.connection()
        try:
            yield conn
        finally:
            # 调用 close() 方法将连接归还给连接池
            conn.close()

# 全局连接池实例,配置根据实际环境调整
mysql_pool = MySQLPool(
    host="127.0.0.1",
    port=3306,
    user="root",
    password="password",
    database="mydb",
)

def init_app(app: FastAPI) -> None:
    """将连接池生命周期与 FastAPI 应用绑定。"""
    @app.on_event("startup")
    def _startup() -> None:
        mysql_pool.init_pool()

    @app.on_event("shutdown")
    def _shutdown() -> None:
        mysql_pool.close_pool()

关键点解析

  • MySQLPool类封装了DBUtils.PooledDB的核心配置与操作。
  • @contextmanager装饰器让connection()方法支持with语句,确保连接自动归还。
  • init_app函数将连接池的初始化与销毁挂钩到FastAPI的生命周期事件上。

在FastAPI路由中使用连接池

接下来,在主应用文件(如main.py)中,我们通过FastAPI的依赖注入系统来获取数据库连接。

# main.py
from typing import Generator, List
from fastapi import Depends, FastAPI
from db import init_app, mysql_pool

app = FastAPI(title="FastAPI MySQL Pool Demo")
init_app(app)

def get_db_conn() -> Generator:
    """FastAPI 依赖项:从连接池获取一个连接。"""
    with mysql_pool.connection() as conn:
        yield conn

@app.get("/users")
def list_users(conn=Depends(get_db_conn)) -> List[dict]:
    """查询用户列表,演示 SELECT 操作。"""
    with conn.cursor() as cursor:
        sql = "SELECT id, username, email FROM users ORDER BY id DESC LIMIT 20"
        cursor.execute(sql)
        rows = cursor.fetchall()
    # 将元组结果转换为字典列表
    column_names = [desc[0] for desc in cursor.description]
    return [dict(zip(column_names, row)) for row in rows]

@app.post("/users")
def create_user(username: str, email: str, conn=Depends(get_db_conn)) -> dict:
    """新建用户,演示 INSERT 操作。"""
    with conn.cursor() as cursor:
        sql = """
        INSERT INTO users (username, email)
        VALUES (%s, %s)
        """
        cursor.execute(sql, (username, email))
        conn.commit()
        user_id = cursor.lastrowid
    return {"id": user_id, "username": username, "email": email}

代码说明

  • 使用Depends(get_db_conn)将数据库连接注入到每个路由函数。
  • 所有SQL语句均为手写,可直接嵌入复杂查询或存储过程调用。
  • 使用参数化查询(%s)防止SQL注入攻击。
  • 默认返回元组数据,可通过配置游标类DictCursor直接返回字典。

若希望查询结果直接返回字典格式,可在创建连接池时配置游标类:

from pymysql.cursors import DictCursor
# 在 MySQLPool._pool_config 字典中添加:
# "cursorclass": DictCursor,

方案对比:连接池 vs. 裸连

为清晰对比,以下是一种常见的无连接池写法(多见于入门教程):

@app.get("/users")
def list_users():
    conn = pymysql.connect(host="127.0.0.1", user="root", ...) # 每次请求新建连接
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT ...")
            rows = cursor.fetchall()
        return rows
    finally:
        conn.close() # 必须记得关闭

裸连方式的缺点

  • 性能瓶颈:高频请求下,反复创建/销毁连接消耗巨大。
  • 资源风险:若忘记关闭连接,将导致连接泄漏。
  • 缺乏管控:连接数随并发量线性增长,易冲击数据库上限。

连接池方式的优势

  • 资源复用:连接预先创建,请求间复用,大幅降低开销。
  • 安全可控:通过maxconnections等参数限制总连接数,保护数据库。
  • 自动管理:借助with上下文和依赖注入,连接借用归还有序且安全。

生产环境优化建议

  1. 参数调优:与DBA协作,根据实际QPS和SQL耗时,合理设置mincachedmaxcachedmaxconnectionsblocking=True可在连接耗尽时等待而非报错,提升系统韧性。
  2. 抽象数据层:建议进一步封装RepositoryDAO层,将SQL细节收敛,业务代码面向对象或数据结构编程。
  3. 监控与日志:记录关键SQL的执行时间与影响行数,对慢查询进行监控告警,便于持续优化。

总结

在FastAPI项目中,无需依赖完整的ORM,通过pymysql + DBUtils连接池 + 手写SQL的组合,完全可以构建出高效、稳定的数据库访问方案。该方案的核心在于:

  • 利用DBUtils管理连接池,保障资源可控与高性能。
  • 通过FastAPI依赖注入,规范化连接的获取与释放流程。
  • 保持手写SQL的灵活性,便于复杂查询优化与已有脚本复用。

对于真实项目,建议在数据访问层进行适度封装,从而兼顾开发的便捷性与架构的清晰度。




上一篇:中国商业航天五强冲刺IPO:可重复使用火箭技术路线与市场格局分析
下一篇:基于GitHub Actions的CI/CD流水线实战:从开发到生产的自动化部署指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 22:56 , Processed in 0.224461 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表