找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

501

积分

0

好友

69

主题
发表于 前天 01:16 | 查看: 11| 回复: 0

上一篇讲解 Pydantic 时,有朋友提出疑问:“校验逻辑明白了,但校验好的数据如何存入数据库呢?”“几个工具放一起,不太清楚它们如何协同工作。”

对于初学者来说,刚接触“Web框架 + 数据校验 + 数据库”这种组合时,确实容易被“模型”、“会话”、“联动”等概念困扰。其实其核心流程非常简单,就像我们去快递站寄包裹:

  • Pydantic 如同“安检员”:先检查你要寄的物品(请求数据)是否符合规定(格式、长度等),不合格的直接退回。
  • SQLAlchemy 如同“仓库管理员”:负责将安检通过的物品(合法数据)存入仓库(数据库),并在需要时取出。
  • FastAPI 如同“项目经理”:负责协调安检员和仓库管理员,确保流程顺畅——先安检、再入库,取货后再整理好交还给寄件人(前端)。

本文将以“寄快递”的类比贯穿始终,手把手带您搭建一个完整的项目。从环境配置到每一行代码,都会解释清楚“为何这样写”以及“它对应实际流程的哪一步”,您可以轻松地复制、粘贴并运行。

一、核心流程解析:数据如何“从前端到数据库”?

我们以“用户注册”为例,将整个数据流转流程拆解为5个步骤,先建立一个全局认知:

  1. 用户在前端填写注册信息(如用户名、密码、邮箱),点击“提交”。
  2. FastAPI 接收数据,并将其交给 Pydantic 进行“安检”:检查用户名长度、邮箱格式、两次输入的密码是否一致等。
  3. 安检通过!FastAPI 将合法数据转交给 SQLAlchemy。
  4. SQLAlchemy 整理数据,并将其存入数据库(例如存储用户名、加密后的密码、邮箱)。
  5. 最后,SQLAlchemy 将存入的信息取出,经由 Pydantic 整理格式(过滤掉密码哈希等敏感字段),再通过 FastAPI 返回给前端,告知用户“注册成功”。

牢记这个流程,后续编写代码时思路会更加清晰——我们所有的操作都是为了实现这5个步骤。

二、环境准备:安装必要的工具

如同寄快递需要纸箱和胶带,我们需要先将必要的依赖库安装好。打开终端,执行以下命令:

# 核心框架、服务器与数据校验库
pip install fastapi uvicorn pydantic==2.4.2

# 异步数据库ORM与MySQL驱动
pip install sqlalchemy aiomysql

# 环境变量管理(避免敏感信息硬编码)
pip install python-dotenv

# 密码哈希加密工具
pip install passlib[bcrypt]

简单了解每个工具的作用即可:

  • aiomysql:作为 SQLAlchemy 与 MySQL 数据库之间的异步驱动,确保数据库I/O操作不会阻塞其他任务。
  • python-dotenv:用于管理敏感信息,如数据库密码和地址,避免直接写在代码中。
  • passlib[bcrypt]:用于将用户明文密码加密成不可逆的哈希值存储,即使数据库泄露,攻击者也无法直接获取原始密码。

三、项目结构搭建

在开始编码前,先规划好项目结构。创建一个新的项目文件夹,并建立以下文件和目录:

fastapi-user-demo/
├── .env           # 存储环境变量(敏感信息)
├── main.py        # 主程序入口(FastAPI应用)
├── models/        # 数据模型目录
│   ├── db.py      # SQLAlchemy ORM模型(数据库表结构)
│   └── schemas.py # Pydantic模型(数据校验与序列化)
├── database.py    # 数据库连接与会话配置
└── utils.py       # 工具函数(如密码哈希)

3.1 配置环境变量 (.env 文件)

创建并编辑 .env 文件,填入您的数据库连接信息(请替换为您的实际信息):

# .env
DATABASE_URL=mysql+aiomysql://root:your_password@localhost:3306/fastapi_demo
SECRET_KEY=your_super_secret_key_here
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

提示DATABASE_URL 的格式为 数据库驱动://用户名:密码@主机:端口/数据库名。请确保MySQL中已创建名为 fastapi_demo 的数据库。

3.2 配置数据库连接 (database.py)

此文件负责创建数据库连接引擎和会话工厂,是SQLAlchemy与数据库沟通的桥梁。

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
from dotenv import load_dotenv
import os

# 加载 .env 文件中的环境变量
load_dotenv()

# 获取数据库连接字符串
DATABASE_URL = os.getenv("DATABASE_URL")

# 创建异步引擎
async_engine = create_async_engine(
    DATABASE_URL,
    echo=True,   # 开发时启用,打印执行的SQL语句
    future=True
)

# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    autoflush=False,
    autocommit=False,
    expire_on_commit=False
)

# 声明ORM模型基类
Base = declarative_base()

# 依赖项:为每个请求创建独立的数据库会话,请求结束后自动关闭
async def get_db():
    db = AsyncSessionLocal()
    try:
        yield db
    finally:
        await db.close()

四、核心模型定义:校验规则与数据表结构

这是实现“校验-存储”联动的关键。我们需要定义两类模型:

  • Pydantic 模型 (schemas.py):定义数据进入和离开系统时的规则(验证、序列化)。
  • SQLAlchemy 模型 (db.py):定义数据在数据库中的存储结构。

4.1 定义SQLAlchemy ORM模型 (models/db.py)

此文件定义了数据库中的“用户表”结构。

from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.sql.sqltypes import DateTime
from sqlalchemy.sql.expression import text
from database import Base

class DBUser(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True, nullable=False)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)  # 存储加密后的密码
    full_name = Column(String, index=True, nullable=True)
    disabled = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=text("now()"))

关键点hashed_password 字段存储的是加密后的密码哈希,而非明文。

4.2 定义Pydantic模型 (models/schemas.py)

此文件定义了数据校验与序列化的规则。

from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime

# 1. 基础模型
class UserBase(BaseModel):
    username: str = Field(..., min_length=3, max_length=20)
    email: str = Field(..., pattern=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
    full_name: Optional[str] = Field(None)

# 2. 用户创建模型(用于注册)
class UserCreate(UserBase):
    password: str = Field(..., min_length=6, max_length=20)
    confirm_password: str = Field(..., min_length=6, max_length=20)

    @validator("confirm_password")
    def passwords_match(cls, v, values):
        if "password" in values and v != values["password"]:
            raise ValueError("两次密码不一致")
        return v

# 3. 用户响应模型(返回给前端)
class UserResponse(UserBase):
    id: int
    disabled: bool
    created_at: datetime

    class Config:
        from_attributes = True  # 允许从ORM对象转换,替代V1的orm_mode

# 4. 用户更新模型
class UserUpdate(BaseModel):
    email: Optional[str] = None
    full_name: Optional[str] = None
    password: Optional[str] = Field(None, min_length=6)
    confirm_password: Optional[str] = Field(None, min_length=6)

    @validator("confirm_password")
    def passwords_match(cls, v, values):
        if "password" in values and values["password"] is not None and v != values["password"]:
            raise ValueError("两次密码不一致")
        return v

核心配置UserResponse 模型中的 from_attributes = True 是实现SQLAlchemy查询结果自动序列化的关键。

4.3 工具函数:密码哈希 (utils.py)

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    """生成密码哈希值"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码与哈希是否匹配"""
    return pwd_context.verify(plain_password, hashed_password)

五、API接口实战:实现完整的CRUD

现在,我们将编写5个核心API接口,串联起整个“校验-存储”流程。编辑 main.py 文件。

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, AsyncGenerator
from contextlib import asynccontextmanager

from database import get_db, Base, async_engine
from models.db import DBUser
from models.schemas import UserCreate, UserResponse, UserUpdate
from utils import get_password_hash

# 应用生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    # 启动时创建数据库表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield

# 创建FastAPI应用实例
app = FastAPI(title="用户管理API示例", lifespan=lifespan)

# --- 1. 用户注册接口 ---
@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user_data: UserCreate,  # 触发Pydantic自动校验
    db: AsyncSession = Depends(get_db)
):
    # 检查用户名或邮箱是否已存在
    result = await db.execute(
        select(DBUser).where(
            (DBUser.username == user_data.username) | (DBUser.email == user_data.email)
        )
    )
    if result.scalar_one_or_none():
        raise HTTPException(status_code=400, detail="用户名或邮箱已存在")

    # 密码哈希加密
    hashed_pwd = get_password_hash(user_data.password)

    # 创建ORM对象
    db_user = DBUser(
        username=user_data.username,
        email=user_data.email,
        full_name=user_data.full_name,
        hashed_password=hashed_pwd
    )

    # 保存到数据库
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)

    # 返回数据,自动按UserResponse模型序列化
    return db_user

# --- 2. 获取用户列表 ---
@app.get("/users/", response_model=List[UserResponse])
async def read_users(
    db: AsyncSession = Depends(get_db),
    skip: int = 0,
    limit: int = 10
):
    result = await db.execute(select(DBUser).offset(skip).limit(limit))
    return result.scalars().all()

# --- 3. 获取单个用户 ---
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(DBUser).where(DBUser.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="用户未找到")
    return user

# --- 4. 更新用户 ---
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    update_data: UserUpdate,
    db: AsyncSession = Depends(get_db)
):
    result = await db.execute(select(DBUser).where(DBUser.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="用户未找到")

    update_dict = update_data.dict(exclude_unset=True)

    # 处理密码更新
    if "password" in update_dict:
        update_dict["hashed_password"] = get_password_hash(update_dict.pop("password"))
    if "confirm_password" in update_dict:
        update_dict.pop("confirm_password")

    # 更新字段
    for key, value in update_dict.items():
        setattr(user, key, value)

    await db.commit()
    await db.refresh(user)
    return user

# --- 5. 删除用户 ---
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(DBUser).where(DBUser.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="用户未找到")

    await db.delete(user)
    await db.commit()

# 运行开发服务器
if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)

重点create_user 接口完整演示了 “Pydantic校验 → 业务检查 → 密码加密 → 数据库存储 → 序列化返回” 的闭环流程。

六、测试与验证

FastAPI 提供了强大的交互式API文档,方便测试。

  1. 启动应用:在终端运行 python main.py
  2. 访问文档:打开浏览器,访问 http://127.0.0.1:8000/docs。您将看到自动生成的Swagger UI界面。

测试注册接口

  1. 在文档中找到 POST /users/ 接口,点击 “Try it out”。
  2. 在请求体中填入测试JSON数据:
    {
      "username": "testuser",
      "email": "test@example.com",
      "full_name": "Test User",
      "password": "mypassword123",
      "confirm_password": "mypassword123"
    }
  3. 点击 “Execute”。如果成功,响应体中会返回用户信息(不包含密码字段)。
  4. 尝试错误场景,例如将 confirm_password 改为不同的值,观察Pydantic返回的验证错误。

其他接口(查询、更新、删除)均可用同样方式进行测试。

七、核心总结与避坑指南

7.1 常见问题

  1. 忘记 await:所有SQLAlchemy的异步操作(如 execute, commit)必须使用 await
  2. 明文存储密码:务必使用 get_password_hash 对密码进行加密处理,这是基础的安全实践
  3. 序列化错误:确保Pydantic响应模型(如 UserResponse)中设置了 from_attributes = True
  4. 数据库连接失败:仔细检查 .env 文件中的 DATABASE_URL 格式及MySQL服务是否运行。
  5. 表不存在:首次运行前,需确保在MySQL中创建了对应的数据库(如 fastapi_demo)。

7.2 技术要点总结

“FastAPI + Pydantic + SQLAlchemy” 组合的核心分工非常清晰:

  1. Pydantic:专职于数据边界。负责验证输入数据的合法性,并格式化输出数据。
  2. SQLAlchemy:专职于数据持久化。负责与数据库交互,执行增删改查操作。
  3. FastAPI:专职于流程协调与暴露。作为桥梁,接收请求、调度校验与存储逻辑、返回响应。

对于初学者,无需一开始就深究每个库的底层原理。首先按照本文的步骤将整个流程跑通,建立起直观的理解。之后,可以在此基础上继续探索数据库迁移(Alembic)、用户认证(JWT)、生产环境部署等进阶主题。




上一篇:CVE-2025-55182漏洞深度解析:Next.js App Router条件下RCE风险与修复指南
下一篇:UART、I2C、SPI、RS485、CAN通信协议全面对比与实战应用指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-9 08:21 , Processed in 0.079903 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表