FastAPI官方推荐使用 python-jose 库来实现基于 OAuth2 规范的密码哈希与 Bearer + JWT Token 认证流程。本文将详细介绍如何从零开始实现这套安全认证机制。
安装依赖库
首先,需要安装核心的JWT处理库。python-jose 是一个用于生成和验证 JWT (JSON Web Tokens) 的Python库。
pip install python-jose
接下来,安装用于密码哈希加密和验证的 passlib 库。这里我们使用其支持的 bcrypt 算法,这是一种安全且被广泛推荐的密码哈希算法。同时,由于认证过程涉及表单数据解析,还需要安装 python-multipart。
pip install bcrypt
pip install passlib
pip install python-multipart
定义数据模型
我们使用 Pydantic 模型来定义用户和Token的数据结构,这是构建现代 Python 后端应用的高效方式。
from pydantic import BaseModel
# 用户基础信息模型
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
# 扩展用户模型,包含数据库中的哈希密码字段
class UserInDB(User):
hashed_password: str
# 模拟用户数据库
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # 密码是“secret”
"disabled": False,
}
}
# 模拟从数据库查询用户的函数
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
定义Token模型
Token 模型用于API响应的标准化。
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
实现密码哈希与验证
创建密码上下文并实现验证逻辑。
from passlib.context import CryptContext
# 创建密码上下文,指定使用bcrypt算法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
实现JWT Token的创建与验证
设置JWT的密钥、算法和过期时间,并编写Token生成函数。
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
# 重要:生产环境应从安全配置中读取,切勿硬编码
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
实现依赖项与用户提取
使用 FastAPI 的依赖注入系统来解析请求头中的 Token 并获取当前用户。
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# 指定获取Token的URL路径
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub") # JWT规范中‘sub’键通常存放用户标识
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
创建核心API端点
1. Token获取端点 (/token)
此端点接收用户名和密码,验证成功后返回一个 JWT Token。
from fastapi import FastAPI
app = FastAPI()
@app.post("/token", summary="使用OAuth2密码模式获取JWT Token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 将用户名存入Token的‘sub’字段
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
2. 受保护的用户信息端点 (/users/me)
该端点依赖于 get_current_active_user,只有提供有效Token且用户状态活跃的请求才能访问,是微服务和 云原生 应用常见的认证模式。
@app.get("/users/me/", response_model=User, summary="获取当前已验证用户的信息")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
运行与测试
- 启动FastAPI应用(例如使用
uvicorn main:app --reload)。
- 打开交互式API文档
http://127.0.0.1:8000/docs。
- 在
/token 端点尝试使用用户名 johndoe 和密码 secret 进行认证,成功后将获得一个 access_token。
- 点击文档右上角的 “Authorize” 按钮,输入获取到的Token(格式为
Bearer <your_token>)。
- 现在,你可以成功调用
/users/me/ 端点并获取到当前的用户信息,而不再会出现未授权错误。
完整代码示例
以下是将所有模块整合到一个文件中的完整代码,便于理解和测试。在实际项目中,建议将模型、工具函数、依赖项和路由分模块管理,以保持代码结构清晰。
# main.py
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# --- 配置与模型 ---
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
# --- 密码工具 ---
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# --- 依赖项 ---
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# --- 应用与路由 ---
app = FastAPI()
@app.post("/token", response_model=Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user