在 Flask 开发中,SQLAlchemy 作为强大的 Python ORM 工具,其数据查询功能是我们日常工作中的重中之重。本文将详细解析 SQLAlchemy 的各种数据查询操作,涵盖从基础到复杂的多种场景。
基本查询方法与过滤器
在深入具体查询前,我们先了解 SQLAlchemy 提供的常用查询过滤器和结果处理方法。
常用查询过滤器:
| 过滤器 |
说明 |
filter() |
添加过滤器到原查询,返回新查询对象。 |
filter_by() |
添加等值过滤器,返回新查询对象。 |
limit() |
限制返回的结果数量。 |
offset() |
偏移原查询结果,常用于分页。 |
order_by() |
根据指定条件对结果排序。 |
group_by() |
根据指定条件对结果分组。 |
常用查询结果方法:
| 方法 |
说明 |
all() |
以列表形式返回所有结果。 |
first() |
返回第一个结果(模型对象),无则返回 None。 |
first_or_404() |
返回第一个结果,无则抛出 404 异常。 |
count() |
返回查询结果的数量。 |
paginate() |
返回一个分页器对象。 |
get() |
根据主键查询,返回对应模型对象,无则返回 None。 |
get_or_404() |
根据主键查询,无则抛出 404 异常。 |
详解核心查询方法
1. get():主键查询
get() 方法通过主键查询单条记录。查询前,我们先看下示例数据表 tb_student2 的内容。

@app.route("/get")
def get():
"""按主键获取一条,有以下几种查询方式"""
# student = Student.query.get({'id': 11})
# student = Student.query.get((11,))
# student = db.session.query(Student).get(11)
student = Student.query.get(11)
print(student)
return "ok"
访问 /get 路由,成功获取到主键为 11 的学生数据。

注意:在终端输出中,可以看到关于 Query.get() 方法的过时警告。在 SQLAlchemy 2.0 中,推荐使用 Session.get()。
2. all():查询所有对象
# 如果不设置条件,则默认查询全表
student_list = Student.query.all()
print(student_list)
# 设置过滤条件查询全部结果
# 如果查不到数据,返回空列表
student_list = Student.query.filter(Student.sex==False).all()
print(student_list)
# all()的返回值是一个Python列表,可以直接使用切片,与Django的QuerySet不同。
student_list = Student.query.all()[1:]
print(student_list)

第二个查询返回了结果,因为数据库中原本存在 sex 为 False(值为0)的记录。

如果将这条记录的 sex 字段改为 True(1),再次查询。

此时,针对 sex==False 的查询将返回一个空列表。

3. count():返回查询结果数量
# 如果不设置过滤条件,则默认统计全表记录的数量
total = Student.query.count()
print(total)
# 设置条件,返回满足条件的记录数量
total = Student.query.filter(Student.age>10).count()
print(total)
执行上述代码,可以统计出所有学生以及年龄大于10的学生的数量。

4. first():返回第一个对象
first() 返回的是单个模型对象。
"""获取查询结果的第一个结果"""
student = Student.query.first()
print(student, student.name)
student = Student.query.filter(Student.sex==True).first()
print(student, student.name)
"""获取查询结果的第二个结果,注意现在不支持负索引了"""
student = Student.query.filter(Student.sex==True)[1]
print(student, student.name)

条件查询详解
基于filter的模糊查询
filter() 方法功能强大,支持多种模糊匹配。
# 名字包含“黑”的学生
student_list = Student.query.filter(Student.name.contains("黑")).all()
print(student_list)
# 名字以“小”开头的学生
student_list = Student.query.filter(Student.name.startswith("小")).all()
print(student_list)
# 名字以“兰”结尾的学生
student_list = Student.query.filter(Student.name.endswith("兰")).all()
print(student_list)

基于filter的比较查询
比较查询的格式为 filter(模型.字段 比较运算符 值)。
# 比较运算符:==, !=, >, <, >=, <=
# 单个条件的比较查询
student_list = Student.query.filter(Student.age>15).all()
print(student_list)
# 多个条件的比较查询
# 要求多个条件都要满足,相当于逻辑查询中的 并且(and)!
student_list = Student.query.filter(Student.age>15, Student.sex==True).all()
print(student_list)

filter_by精确等值查询
filter_by() 只支持字段值是否相等的精确查询,不支持大于、小于等其他比较操作。
# 单条件格式:filter_by(字段=值)
# 多条件格式:filter_by(字段=值, 字段=值, 字段=值...)
student_list = Student.query.filter_by(age=16).all() # 字段添加不需要附带模型类
print(student_list)

由于表中没有 age=16 的记录,因此返回空列表。
逻辑查询
逻辑与 (and)
默认情况下,在 filter() 中用逗号分隔多个条件即表示“与”关系。也可以显式使用 and_()。
# 默认方式:查询年龄大于10的男生
student_list = Student.query.filter(Student.age>10, Student.sex==True).all()
print("第一次", student_list)
# 显式使用and_:filter(and_(条件1,条件2,....)) 等价于 filter(条件1,条件2,.....)
from sqlalchemy import and_
student_list = Student.query.filter(and_(Student.age>10, Student.sex==True)).all()
print("第二次", student_list)

逻辑或 (or)
逻辑或查询需要导入 or_。
# 查询年龄大于11或者钱包余额大于1500的女生
from sqlalchemy import or_
student_list = Student.query.filter(or_(Student.age>11, Student.money>1500), Student.sex==False).all()
print("第一次", student_list)
# 更复杂的组合:查询 (年龄大于11的男生) 或 (钱包余额大于1500的女生)
from sqlalchemy import or_, and_
student_list = Student.query.filter(
or_(
and_(Student.age>11, Student.sex==True),
and_(Student.money>1500, Student.sex==False)
)
).all()
print("第二次", student_list)

逻辑非 (not)
可以使用 != 运算符或 not_() 函数实现非查询。
# 方式一:使用 !=
student = Student.query.filter(Student.name != '小白').all()
print("第一次", student)
# 方式二:使用 not_ 取反
from sqlalchemy import not_
student = Student.query.filter(not_(Student.name == '小白')).all()
print("第二次", student)

范围查询 (in_)
student_list = Student.query.filter(Student.id.in_([1, 11, 4, 5])).all()
print(student_list)

空值判断 (is_)
# 查询邮箱为Null的用户
student_list = Student.query.filter(Student.email.is_(None)).all()
print(student_list)
"""判断记录是否存在"""
# 方法一:使用exists()和scalar()
query = Student.query.filter(Student.name == "小辉").exists()
ret = db.session.query(query).scalar() # 通过scalar()获取布尔值
print("第一次是否存在", ret)
# 方法二:查询后判断
student = Student.query.filter(Student.name=="小黑").first()
print("第二次是否存在", bool(student))

结果排序与分页
order_by 排序
# 按年龄降序排序
student_list = Student.query.order_by(Student.age.desc()).all()
print(student_list)
# 按钱包余额降序排序,如果余额一致,再按id降序排序
student_list = Student.query.order_by(Student.money.desc(), Student.id.desc()).all()
print(student_list)

limit 与 offset 分页限制
limit() 和 offset() 用于对查询结果进行数量限制和偏移,常用于分页。
# 查询年龄最大的三个人
student_list = Student.query.order_by(Student.age.desc()).limit(3).all()
print(student_list)
# 查询钱包余额最少的三个人
student_list = Student.query.order_by(Student.money.asc()).limit(3).all()
print(student_list)
# 按钱包余额降序排列,查询出排名第4到第5的学生(offset+limit实现)
student_list = Student.query.order_by(Student.money.desc()).offset(3).limit(2).all()
print(student_list)
# 也可以对all()返回的列表使用切片(在内存中处理,数据量大时不推荐)
student_list = Student.query.order_by(Student.money.desc()).all()[3:5]
print(student_list)

完整示例代码
以下是一个集成了上述所有查询方法的完整 Flask 应用示例,使用 MySQL 作为数据库。
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 连接数据库
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:password@127.0.0.1:3306/flask-test?charset=utf8mb4"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_ECHO"] = True # 显示SQL语句
db = SQLAlchemy()
db.init_app(app)
class Student(db.Model):
"""学生信息模型"""
__tablename__ = "tb_student2"
id = db.Column(db.Integer, primary_key=True, comment="主键")
name = db.Column(db.String(15), index=True, comment="姓名")
age = db.Column(db.SmallInteger, comment="年龄")
sex = db.Column(db.Boolean, default=True, comment="性别")
email = db.Column(db.String(128), unique=True, comment="邮箱地址")
money = db.Column(db.Numeric(10, 2), default=0.0, comment="钱包")
def __repr__(self):
return f"{self.name}<{self.__class__.__name__}>"
@app.route("/get")
def get():
"""在此处替换为上文中的各种查询示例代码"""
# 例如:查询钱包余额最少的三个人
student_list = Student.query.order_by(Student.money.asc()).limit(3).all()
print(student_list)
return "ok"
if __name__ == '__main__':
with app.app_context():
db.create_all() # 创建数据表(如果不存在)
app.run(debug=True)
总结
本文系统性地介绍了 Flask-SQLAlchemy 中几乎所有的数据查询方法,从最基本的 get()、all(),到条件过滤 filter()、filter_by(),再到复杂的逻辑组合、空值判断、排序和分页。掌握这些查询技巧,能够让你在 Flask Web 开发中高效、灵活地操作数据库。希望这份详细的解析能对你有所帮助。想了解更多技术实战内容,欢迎访问云栈社区。