

架构决定了代码的寿命,而模式决定了架构的质量
你是否曾回头审视自己半年前写的Python项目,然后心想:
这堆垃圾是谁写的?哦等等…好像是我自己。
别担心,每个真正的开发者都有这样的时刻。问题不在于你的编程技巧,而在于你的架构。而架构,正是90%技术债务的诞生地。
经过四年多构建从“可爱小脚本”到“为什么这东西要吃12GB内存?”的系统后,我学到了一个痛苦的教训:
语法拯救不了项目,模式才能。
今天,云朵君要分享给你10个Python模式,在构建任何像样的“大项目”之前,你应该把它们刻在脑子里。
这些不是你在每个YouTube教程里看到的基础模式。这些是我多么希望有人在我写下4万行意大利面条式代码之前,就甩在我脸上的模式。
引言:为什么你需要这些模式?
想象一下这个场景:你的项目从简单的脚本成长为有多个模块、需要团队协作的完整应用。突然之间:
- 添加新功能需要修改5个不同的文件
- 测试变得异常困难
- 简单的修改引发意想不到的连锁反应
- 新同事需要一周才能理解你的代码结构
这就是缺乏设计模式的代价。而下面的10个模式,正是解决这些问题的钥匙。
1. 依赖注入:告别硬编码的紧耦合
大多数开发者不用依赖注入,因为他们觉得“Python是动态语言嘛”。确实…直到你尝试测试一个自己实例化数据库连接的类。
为什么重要?
- 实现真正的解耦
- 便于单元测试(可以轻松模拟依赖)
- 更容易替换实现而不重写逻辑
# 错误的做法:硬编码依赖
class BadUserManager:
def __init__(self):
self.email_service = EmailService() # 硬编码!
def register(self, user):
# 业务逻辑
self.email_service.send(user, "Welcome!")
# 正确的做法:依赖注入
class EmailService:
def send(self, to, message):
print(f"发送邮件给 {to}: {message}")
class UserManager:
def __init__(self, email_service): # 依赖作为参数传入
self.email_service = email_service
def register(self, user):
# 业务逻辑
self.email_service.send(user, "欢迎!")
# 使用
email_service = EmailService()
manager = UserManager(email_service) # 注入依赖
manager.register("test@example.com")
# 测试时可以轻松模拟
class MockEmailService:
def send(self, to, message):
print(f"[测试] 模拟发送给 {to}")
test_manager = UserManager(MockEmailService())
test_manager.register("test@example.com")
实战技巧:在FastAPI或Django中,依赖注入常用于数据库会话、配置等。试试看,你的测试代码会变得多么干净!
2. 策略模式:告别if/else的丛林
如果你的代码有一个巨大的决策树,恭喜你——你创造了一个怪物。策略模式用清晰、可互换的行为替换了这团乱麻。
# 常见反模式:if/else地狱
def process_payment(amount, payment_method):
if payment_method == "paypal":
# 处理PayPal支付
pass
elif payment_method == "stripe":
# 处理Stripe支付
pass
elif payment_method == "alipay":
# 处理支付宝支付
pass
# ... 更多elif
# 优雅的解决方案:策略模式
from abc import ABC, abstractmethod
class PaymentStrategy(ABC):
"""支付策略抽象基类"""
@abstractmethod
def pay(self, amount):
pass
class PayPalPayment(PaymentStrategy):
def pay(self, amount):
print(f"通过PayPal支付 {amount} 元")
# 实际的PayPal API调用
return True
class StripePayment(PaymentStrategy):
def pay(self, amount):
print(f"通过Stripe支付 {amount} 元")
# 实际的Stripe API调用
return True
class AlipayPayment(PaymentStrategy):
def pay(self, amount):
print(f"通过支付宝支付 {amount} 元")
# 实际的支付宝API调用
return True
class PaymentProcessor:
def __init__(self):
self.strategies = {
"paypal": PayPalPayment(),
"stripe": StripePayment(),
"alipay": AlipayPayment()
}
def process(self, method, amount):
strategy = self.strategies.get(method)
if not strategy:
raise ValueError(f"不支持的支付方式: {method}")
return strategy.pay(amount)
# 使用
processor = PaymentProcessor()
processor.process("paypal", 100)
processor.process("alipay", 200)
# 添加新支付方式非常容易
class WeChatPayment(PaymentStrategy):
def pay(self, amount):
print(f"通过微信支付 {amount} 元")
return True
processor.strategies["wechat"] = WeChatPayment()
设计优势:新增支付方式只需添加新类,无需修改现有代码——这符合开闭原则。
3. 建造者模式:当构造函数变得臃肿时
曾经有一个类有九个可选参数吗?是的。那不是构造函数——那是人质劫持现场。
# 问题:太多参数的构造函数
class User:
def __init__(self, name, email=None, phone=None, age=None,
address=None, city=None, country=None,
is_active=True, is_admin=False):
# 初始化所有属性...
pass
# 使用起来很痛苦
user = User(
name="张三",
email="zhangsan@example.com",
phone="13800138000",
age=25,
address="某街道123号",
city="北京",
country="中国",
is_active=True,
is_admin=False
)
# 解决方案:建造者模式
class QueryBuilder:
def __init__(self, table="users"):
self.table = table
self._select_fields = ["*"]
self._where_conditions = []
self._order_by_fields = []
self._limit_value = None
def select(self, *fields):
self._select_fields = fields
return self
def where(self, condition):
self._where_conditions.append(condition)
return self
def order_by(self, field, descending=False):
direction = "DESC" if descending else "ASC"
self._order_by_fields.append(f"{field} {direction}")
return self
def limit(self, count):
self._limit_value = count
return self
def build(self):
# 构建SELECT部分
select_clause = f"SELECT {', '.join(self._select_fields)} FROM {self.table}"
# 构建WHERE部分
where_clause = ""
if self._where_conditions:
where_clause = " WHERE " + " AND ".join(self._where_conditions)
# 构建ORDER BY部分
order_clause = ""
if self._order_by_fields:
order_clause = " ORDER BY " + ", ".join(self._order_by_fields)
# 构建LIMIT部分
limit_clause = ""
if self._limit_value:
limit_clause = f" LIMIT {self._limit_value}"
return select_clause + where_clause + order_clause + limit_clause
# 流畅的API调用
query = (
QueryBuilder("users")
.select("id", "name", "email")
.where("age > 18")
.where("is_active = TRUE")
.order_by("created_at", descending=True)
.limit(10)
.build()
)
print(query)
# 输出: SELECT id, name, email FROM users WHERE age > 18 AND is_active = TRUE ORDER BY created_at DESC LIMIT 10
现代应用:SQLAlchemy的查询接口、Django的Q对象、Pydantic的模型配置都使用了类似的建造者模式。
4. 事件驱动模式:当应用需要应对高并发时
大型系统崩溃往往因为一切依赖于一切。解决方案是:事件,而不是链式调用。
class EventBus:
"""事件总线 - 组件通信的中枢"""
def __init__(self):
self._subscribers = {}
def subscribe(self, event_type, callback):
"""订阅事件"""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
if callback not in self._subscribers[event_type]:
self._subscribers[event_type].append(callback)
def unsubscribe(self, event_type, callback):
"""取消订阅"""
if event_type in self._subscribers:
self._subscribers[event_type].remove(callback)
def publish(self, event_type, data=None):
"""发布事件"""
if event_type in self._subscribers:
for callback in self._subscribers[event_type]:
try:
callback(data)
except Exception as e:
print(f"事件处理出错: {e}")
def get_subscriber_count(self, event_type):
"""获取订阅者数量"""
return len(self._subscribers.get(event_type, []))
# 定义事件类型
USER_REGISTERED = "user_registered"
ORDER_CREATED = "order_created"
PAYMENT_SUCCESS = "payment_success"
# 创建事件总线实例
bus = EventBus()
# 事件处理器
def send_welcome_email(user_data):
print(f"[邮件服务] 发送欢迎邮件给 {user_data.get('email')}")
def init_user_storage(user_data):
print(f"[存储服务] 为用户 {user_data.get('username')} 初始化存储空间")
def log_user_registration(user_data):
print(f"[日志服务] 用户注册: {user_data}")
# 订阅事件
bus.subscribe(USER_REGISTERED, send_welcome_email)
bus.subscribe(USER_REGISTERED, init_user_storage)
bus.subscribe(USER_REGISTERED, log_user_registration)
# 发布事件
new_user = {
"id": 123,
"username": "zhangsan",
"email": "zhangsan@example.com",
"registered_at": "2024-01-01 10:00:00"
}
print("用户注册事件触发...")
bus.publish(USER_REGISTERED, new_user)
print(f"事件处理完成,共有{bus.get_subscriber_count(USER_REGISTERED)}个订阅者")
应用场景:微服务通信、插件系统、GUI应用的事件处理。这种模式比治疗更能解耦有毒的关系。
5. 仓储模式:阻止SQL泄漏到业务逻辑中
如果SQL查询生活在你的业务逻辑中,那么你正在邀请混乱。
import sqlite3
from contextlib import contextmanager
from typing import List, Optional, Dict, Any
@contextmanager
def get_db_connection():
"""数据库连接上下文管理器"""
conn = sqlite3.connect(":memory:") # 使用内存数据库示例
conn.row_factory = sqlite3.Row # 返回字典-like的行
try:
yield conn
finally:
conn.close()
class UserRepository:
"""用户数据仓储"""
def __init__(self, db_connection):
self.db = db_connection
self._create_table()
def _create_table(self):
"""创建用户表"""
self.db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.db.commit()
def get_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取用户"""
cursor = self.db.execute(
"SELECT * FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
return dict(row) if row else None
def get_by_email(self, email: str) -> Optional[Dict[str, Any]]:
"""根据邮箱获取用户"""
cursor = self.db.execute(
"SELECT * FROM users WHERE email = ?",
(email,)
)
row = cursor.fetchone()
return dict(row) if row else None
def get_all(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取所有用户"""
cursor = self.db.execute(
"SELECT * FROM users LIMIT ?",
(limit,)
)
return [dict(row) for row in cursor.fetchall()]
def add(self, username: str, email: str, age: int = None) -> int:
"""添加用户"""
cursor = self.db.execute(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
(username, email, age)
)
self.db.commit()
return cursor.lastrowid
def update(self, user_id: int, **kwargs) -> bool:
"""更新用户信息"""
if not kwargs:
return False
set_clause = ", ".join([f"{k} = ?" for k in kwargs.keys()])
values = list(kwargs.values())
values.append(user_id)
self.db.execute(
f"UPDATE users SET {set_clause} WHERE id = ?",
values
)
self.db.commit()
return True
def delete(self, user_id: int) -> bool:
"""删除用户"""
cursor = self.db.execute(
"DELETE FROM users WHERE id = ?",
(user_id,)
)
self.db.commit()
return cursor.rowcount > 0
# 使用示例
with get_db_connection() as conn:
repo = UserRepository(conn)
# 添加用户
user_id = repo.add("zhangsan", "zhangsan@example.com", 25)
print(f"添加用户成功,ID: {user_id}")
# 查询用户
user = repo.get_by_id(user_id)
print(f"查询用户: {user}")
# 更新用户
repo.update(user_id, age=26, email="new_email@example.com")
# 再次查询
updated_user = repo.get_by_id(user_id)
print(f"更新后用户: {updated_user}")
# 获取所有用户
all_users = repo.get_all()
print(f"总用户数: {len(all_users)}")
架构优势:更换数据库?只需更新一个文件。这就是架构的力量。
6. 映射器模式:你的数据 ≠ 你的对象
将数据结构映射到Python对象可以给你一致性,并消除"字典末日"问题。
from datetime import datetime
from typing import Optional
class User:
"""领域模型:用户实体"""
def __init__(self, id: int, username: str, email: str,
age: Optional[int] = None, created_at: Optional[datetime] = None):
self.id = id
self.username = username
self.email = email
self.age = age
self.created_at = created_at or datetime.now()
def __repr__(self):
return f"User(id={self.id}, username='{self.username}', email='{self.email}')"
def is_adult(self) -> bool:
return self.age is not None and self.age >= 18
def get_display_name(self) -> str:
return f"{self.username} ({self.email})"
class UserMapper:
"""用户映射器:在领域模型和数据模型之间转换"""
@staticmethod
def from_dict(data: dict) -> User:
"""从字典创建User对象"""
return User(
id=data.get("id"),
username=data["username"],
email=data["email"],
age=data.get("age"),
created_at=datetime.fromisoformat(data["created_at"]) if "created_at" in data else None
)
@staticmethod
def from_database_row(row) -> User:
"""从数据库行创建User对象"""
return User(
id=row["id"],
username=row["username"],
email=row["email"],
age=row["age"],
created_at=datetime.fromisoformat(row["created_at"]) if row["created_at"] else None
)
@staticmethod
def to_dict(user: User) -> dict:
"""将User对象转为字典"""
return {
"id": user.id,
"username": user.username,
"email": user.email,
"age": user.age,
"created_at": user.created_at.isoformat() if user.created_at else None,
"is_adult": user.is_adult(),
"display_name": user.get_display_name()
}
@staticmethod
def to_database_params(user: User) -> tuple:
"""将User对象转为数据库参数"""
return (
user.username,
user.email,
user.age,
user.created_at.isoformat() if user.created_at else None
)
# 示例:从API JSON到领域对象
api_response = {
"id": 123,
"username": "zhangsan",
"email": "zhangsan@example.com",
"age": 25,
"created_at": "2024-01-01T10:00:00"
}
# 映射到领域对象
user = UserMapper.from_dict(api_response)
print(f"领域对象: {user}")
print(f"是否成人: {user.is_adult()}")
print(f"显示名称: {user.get_display_name()}")
# 映射回字典
user_dict = UserMapper.to_dict(user)
print(f"回传字典: {user_dict}")
# 示例:从数据库到领域对象
class MockDBRow:
"""模拟数据库行"""
def __init__(self):
self.data = {
"id": 456,
"username": "lisi",
"email": "lisi@example.com",
"age": 17,
"created_at": "2024-01-02T09:00:00"
}
def __getitem__(self, key):
return self.data[key]
db_row = MockDBRow()
user_from_db = UserMapper.from_database_row(db_row)
print(f"\n从数据库创建的用户: {user_from_db}")
print(f"是否成人: {user_from_db.is_adult()}")
API给你JSON,你的代码值得拥有结构。
7. 管道模式:当任务需要流畅执行时
完美适用于机器学习预处理、ETL,或任何你的逻辑有多个阶段的情况。
from abc import ABC, abstractmethod
from typing import Any, List
class PipelineStep(ABC):
"""管道步骤抽象基类"""
@abstractmethod
def process(self, data: Any) -> Any:
pass
def __call__(self, data: Any) -> Any:
return self.process(data)
class TextCleanStep(PipelineStep):
"""文本清洗步骤"""
def process(self, text: str) -> str:
# 移除多余空格
text = ' '.join(text.split())
# 转换为小写
text = text.lower()
# 移除特殊字符(简单示例)
text = ''.join(c for c in text if c.isalnum() or c.isspace())
return text
class TokenizeStep(PipelineStep):
"""分词步骤"""
def __init__(self, tokenizer=None):
self.tokenizer = tokenizer or str.split
def process(self, text: str) -> List[str]:
return self.tokenizer(text)
class StopwordRemovalStep(PipelineStep):
"""停用词移除步骤"""
def __init__(self, stopwords=None):
self.stopwords = stopwords or {"a", "an", "the", "and", "or", "but", "in", "on", "at"}
def process(self, tokens: List[str]) -> List[str]:
return [token for token in tokens if token not in self.stopwords]
class StemmingStep(PipelineStep):
"""词干提取步骤(简化版)"""
def process(self, tokens: List[str]) -> List[str]:
# 简化版的词干提取
stemmed = []
for token in tokens:
if token.endswith("ing"):
token = token[:-3]
elif token.endswith("ed"):
token = token[:-2]
elif token.endswith("s"):
token = token[:-1]
stemmed.append(token)
return stemmed
class Pipeline:
"""管道:连接多个处理步骤"""
def __init__(self, steps: List[PipelineStep] = None):
self.steps = steps or []
def add_step(self, step: PipelineStep):
"""添加处理步骤"""
self.steps.append(step)
return self # 支持链式调用
def process(self, data: Any) -> Any:
"""处理数据"""
result = data
for step in self.steps:
print(f"执行步骤: {step.__class__.__name__}")
result = step.process(result)
print(f"当前结果: {result}")
return result
def __call__(self, data: Any) -> Any:
return self.process(data)
# 构建文本处理管道
text_pipeline = Pipeline()
text_pipeline.add_step(TextCleanStep())
text_pipeline.add_step(TokenizeStep())
text_pipeline.add_step(StopwordRemovalStep())
text_pipeline.add_step(StemmingStep())
# 处理文本
sample_text = " I am RUNNING in the park and playing with dogs! "
print("原始文本:", sample_text)
print("\n开始管道处理...")
final_result = text_pipeline.process(sample_text)
print(f"\n最终结果: {final_result}")
# 扩展:可以轻松添加新步骤
class SpellCheckStep(PipelineStep):
"""拼写检查步骤(示例)"""
def process(self, tokens: List[str]) -> List[str]:
# 这里可以实现实际的拼写检查逻辑
corrected = []
for token in tokens:
# 简单示例:修正常见拼写错误
if token == "runn":
token = "run"
corrected.append(token)
return corrected
# 添加新步骤到现有管道
text_pipeline.add_step(SpellCheckStep())
new_result = text_pipeline.process("I am runn in the parc")
print(f"\n包含拼写检查的结果: {new_result}")
管道保持你的大脑——和你的项目——整洁有序。
8. 命令模式:撤销、重做和"再来一次"
如果你的项目需要可逆操作,这是你的救星。
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class Command(ABC):
"""命令抽象基类"""
@abstractmethod
def execute(self) -> Any:
"""执行命令"""
pass
@abstractmethod
def undo(self) -> Any:
"""撤销命令"""
pass
def __str__(self):
return self.__class__.__name__
class TextEditor:
"""文本编辑器(接收者)"""
def __init__(self):
self.text = ""
self.history: List[str] = []
def add_text(self, new_text: str):
"""添加文本"""
self.history.append(self.text)
self.text += new_text
print(f"添加文本: '{new_text}'")
print(f"当前文本: '{self.text}'")
def delete_last_word(self):
"""删除最后一个单词"""
if self.text:
self.history.append(self.text)
words = self.text.split()
if words:
last_word = words[-1]
self.text = ' '.join(words[:-1])
print(f"删除最后一个单词: '{last_word}'")
print(f"当前文本: '{self.text}'")
return last_word
return ""
def clear_text(self):
"""清空文本"""
self.history.append(self.text)
self.text = ""
print("清空文本")
def restore_from_history(self):
"""从历史恢复"""
if self.history:
self.text = self.history.pop()
print(f"恢复到: '{self.text}'")
class AddTextCommand(Command):
"""添加文本命令"""
def __init__(self, editor: TextEditor, text: str):
self.editor = editor
self.text = text
self.previous_text = ""
def execute(self) -> None:
self.previous_text = self.editor.text
self.editor.add_text(self.text)
def undo(self) -> None:
self.editor.text = self.previous_text
print(f"撤销添加文本,恢复为: '{self.editor.text}'")
class DeleteLastWordCommand(Command):
"""删除最后一个单词命令"""
def __init__(self, editor: TextEditor):
self.editor = editor
self.deleted_word = ""
self.previous_text = ""
def execute(self) -> None:
self.previous_text = self.editor.text
self.deleted_word = self.editor.delete_last_word()
def undo(self) -> None:
self.editor.text = self.previous_text
print(f"撤销删除,恢复为: '{self.editor.text}'")
class ClearTextCommand(Command):
"""清空文本命令"""
def __init__(self, editor: TextEditor):
self.editor = editor
self.previous_text = ""
def execute(self) -> None:
self.previous_text = self.editor.text
self.editor.clear_text()
def undo(self) -> None:
self.editor.text = self.previous_text
print(f"撤销清空,恢复为: '{self.editor.text}'")
class CommandHistory:
"""命令历史管理器"""
def __init__(self):
self.history: List[Command] = []
self.undo_history: List[Command] = []
def execute_command(self, command: Command):
"""执行命令并记录"""
command.execute()
self.history.append(command)
self.undo_history.clear() # 新的执行清除重做历史
print(f"执行命令: {command}")
def undo(self):
"""撤销上一个命令"""
if self.history:
command = self.history.pop()
command.undo()
self.undo_history.append(command)
print(f"撤销命令: {command}")
else:
print("没有可撤销的命令")
def redo(self):
"""重做上一个撤销的命令"""
if self.undo_history:
command = self.undo_history.pop()
command.execute()
self.history.append(command)
print(f"重做命令: {command}")
else:
print("没有可重做的命令")
def get_history(self):
"""获取命令历史"""
return [str(cmd) for cmd in self.history]
# 使用示例
editor = TextEditor()
history = CommandHistory()
print("=== 文本编辑器演示 ===\n")
# 执行一系列命令
add_hello = AddTextCommand(editor, "Hello ")
history.execute_command(add_hello)
add_world = AddTextCommand(editor, "World ")
history.execute_command(add_world)
add_python = AddTextCommand(editor, "Python ")
history.execute_command(add_python)
print(f"\n当前文本: '{editor.text}'")
print(f"命令历史: {history.get_history()}")
# 撤销操作
print("\n--- 执行撤销 ---")
history.undo()
print(f"撤销后文本: '{editor.text}'")
history.undo()
print(f"再撤销后文本: '{editor.text}'")
# 重做操作
print("\n--- 执行重做 ---")
history.redo()
print(f"重做后文本: '{editor.text}'")
# 执行删除命令
print("\n--- 执行删除 ---")
delete_cmd = DeleteLastWordCommand(editor)
history.execute_command(delete_cmd)
# 执行清空命令
print("\n--- 执行清空 ---")
clear_cmd = ClearTextCommand(editor)
history.execute_command(clear_cmd)
# 多次撤销
print("\n--- 多次撤销 ---")
for _ in range(3):
history.undo()
print(f"\n最终文本: '{editor.text}'")
print(f"最终历史: {history.get_history()}")
UI应用、CLI工具、编辑器——这种模式无处不在。
9. 规范模式:不会腐烂的过滤逻辑
当过滤器变得复杂时,规范模式保持其优雅。
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime, timedelta
class Specification(ABC):
"""规范抽象基类"""
@abstractmethod
def is_satisfied(self, item: Dict[str, Any]) -> bool:
"""检查项是否满足规范"""
pass
def __and__(self, other: 'Specification') -> 'AndSpecification':
"""与操作"""
return AndSpecification(self, other)
def __or__(self, other: 'Specification') -> 'OrSpecification':
"""或操作"""
return OrSpecification(self, other)
def __invert__(self) -> 'NotSpecification':
"""非操作"""
return NotSpecification(self)
def __call__(self, item: Dict[str, Any]) -> bool:
"""使规范可调用"""
return self.is_satisfied(item)
class AgeAboveSpecification(Specification):
"""年龄大于规范"""
def __init__(self, min_age: int):
self.min_age = min_age
def is_satisfied(self, user: Dict[str, Any]) -> bool:
age = user.get('age')
return age is not None and age > self.min_age
def __str__(self):
return f"Age > {self.min_age}"
class HasEmailSpecification(Specification):
"""有邮箱规范"""
def is_satisfied(self, user: Dict[str, Any]) -> bool:
email = user.get('email')
return bool(email and '@' in email)
def __str__(self):
return "Has Email"
class IsActiveSpecification(Specification):
"""活跃用户规范"""
def __init__(self, days_threshold: int = 30):
self.days_threshold = days_threshold
def is_satisfied(self, user: Dict[str, Any]) -> bool:
last_login = user.get('last_login')
if not last_login:
return False
if isinstance(last_login, str):
last_login = datetime.fromisoformat(last_login)
days_since_login = (datetime.now() - last_login).days
return days_since_login <= self.days_threshold
def __str__(self):
return f"Active within {self.days_threshold} days"
class BalanceAboveSpecification(Specification):
"""余额大于规范"""
def __init__(self, min_balance: float):
self.min_balance = min_balance
def is_satisfied(self, user: Dict[str, Any]) -> bool:
balance = user.get('balance', 0)
return balance >= self.min_balance
def __str__(self):
return f"Balance >= {self.min_balance}"
class AndSpecification(Specification):
"""与规范"""
def __init__(self, *specs: Specification):
self.specs = specs
def is_satisfied(self, item: Dict[str, Any]) -> bool:
return all(spec(item) for spec in self.specs)
def __str__(self):
return " AND ".join(str(spec) for spec in self.specs)
class OrSpecification(Specification):
"""或规范"""
def __init__(self, *specs: Specification):
self.specs = specs
def is_satisfied(self, item: Dict[str, Any]) -> bool:
return any(spec(item) for spec in self.specs)
def __str__(self):
return " OR ".join(str(spec) for spec in self.specs)
class NotSpecification(Specification):
"""非规范"""
def __init__(self, spec: Specification):
self.spec = spec
def is_satisfied(self, item: Dict[str, Any]) -> bool:
return not self.spec(item)
def __str__(self):
return f"NOT ({self.spec})"
class UserFilter:
"""用户过滤器"""
@staticmethod
def filter_users(users: List[Dict[str, Any]], spec: Specification) -> List[Dict[str, Any]]:
"""根据规范过滤用户"""
return [user for user in users if spec.is_satisfied(user)]
@staticmethod
def count_users(users: List[Dict[str, Any]], spec: Specification) -> int:
"""统计满足规范的用户数"""
return sum(1 for user in users if spec.is_satisfied(user))
# 示例数据
users = [
{"id": 1, "name": "张三", "age": 25, "email": "zhangsan@example.com", "balance": 1000.0, "last_login": "2024-01-15T10:00:00", "is_active": True},
{"id": 2, "name": "李四", "age": 17, "email": "lisi@example.com", "balance": 500.0, "last_login": "2023-11-01T09:00:00", "is_active": True},
{"id": 3, "name": "王五", "age": 30, "email": "", "balance": 2000.0, "last_login": "2024-01-20T14:00:00", "is_active": False},
{"id": 4, "name": "赵六", "age": 22, "email": "zhaoliu@example.com", "balance": 300.0, "last_login": "2024-01-18T16:00:00", "is_active": True},
{"id": 5, "name": "孙七", "age": 35, "email": "sunqi@example.com", "balance": 1500.0, "last_login": "2023-12-01T08:00:00", "is_active": True},
]
print("=== 用户过滤系统 ===\n")
# 基本规范
adult_spec = AgeAboveSpecification(18)
has_email_spec = HasEmailSpecification()
active_spec = IsActiveSpecification(days_threshold=30)
rich_spec = BalanceAboveSpecification(1000)
print("1. 成年用户:")
adults = UserFilter.filter_users(users, adult_spec)
print(f" 规范: {adult_spec}")
print(f" 数量: {len(adults)}")
for user in adults:
print(f" - {user['name']} ({user['age']}岁)")
print("\n2. 有邮箱的活跃成年用户:")
complex_spec = adult_spec & has_email_spec & active_spec
filtered = UserFilter.filter_users(users, complex_spec)
print(f" 规范: {complex_spec}")
print(f" 数量: {len(filtered)}")
for user in filtered:
print(f" - {user['name']}")
print("\n3. 有钱或活跃的用户:")
rich_or_active = rich_spec | active_spec
filtered = UserFilter.filter_users(users, rich_or_active)
print(f" 规范: {rich_or_active}")
print(f" 数量: {len(filtered)}")
for user in filtered:
print(f" - {user['name']} (余额: {user['balance']}, 最近登录: {user['last_login'][:10]})")
print("\n4. 非活跃用户:")
not_active = ~active_spec
filtered = UserFilter.filter_users(users, not_active)
print(f" 规范: {not_active}")
print(f" 数量: {len(filtered)}")
for user in filtered:
print(f" - {user['name']} (最后登录: {user['last_login'][:10]})")
print("\n5. 动态构建的复杂查询:")
# 查找:成年、有邮箱、余额>500,且(是活跃用户或余额>1500)
dynamic_spec = (
adult_spec &
has_email_spec &
BalanceAboveSpecification(500) &
(active_spec | BalanceAboveSpecification(1500))
)
print(f" 规范: {dynamic_spec}")
count = UserFilter.count_users(users, dynamic_spec)
print(f" 满足条件的用户数: {count}")
filtered = UserFilter.filter_users(users, dynamic_spec)
for user in filtered:
print(f" - {user['name']}: 年龄{user['age']}, 余额{user['balance']}, 活跃{user['is_active']}")
向迷宫般的过滤逻辑说再见。
10. 注册表模式:动态插件的智能方式
如果你的"插件系统"使用if name == 'plugin_a':,那么你并没有插件系统。
from abc import ABC, abstractmethod
from typing import Dict, Any, Callable, Optional, Type
import importlib
import pkgutil
class Plugin(ABC):
"""插件基类"""
@abstractmethod
def execute(self, *args, **kwargs) -> Any:
"""执行插件"""
pass
def get_name(self) -> str:
"""获取插件名称"""
return self.__class__.__name__
def get_version(self) -> str:
"""获取插件版本"""
return "1.0.0"
class PluginRegistry:
"""插件注册表"""
_plugins: Dict[str, Type[Plugin]] = {}
_instances: Dict[str, Plugin] = {}
@classmethod
def register(cls, name: Optional[str] = None):
"""注册插件装饰器"""
def decorator(plugin_cls: Type[Plugin]) -> Type[Plugin]:
plugin_name = name or plugin_cls.__name__.lower()
if plugin_name in cls._plugins:
raise ValueError(f"插件 '{plugin_name}' 已注册")
cls._plugins[plugin_name] = plugin_cls
print(f"注册插件: {plugin_name} -> {plugin_cls.__name__}")
return plugin_cls
return decorator
@classmethod
def get_plugin(cls, name: str) -> Optional[Plugin]:
"""获取插件实例"""
if name not in cls._instances:
if name in cls._plugins:
cls._instances[name] = cls._plugins[name]()
else:
return None
return cls._instances.get(name)
@classmethod
def get_all_plugins(cls) -> Dict[str, Plugin]:
"""获取所有插件实例"""
# 确保所有插件都已实例化
for name in cls._plugins:
if name not in cls._instances:
cls._instances[name] = cls._plugins[name]()
return cls._instances.copy()
@classmethod
def list_plugins(cls) -> list:
"""列出所有注册的插件"""
return list(cls._plugins.keys())
@classmethod
def execute_plugin(cls, name: str, *args, **kwargs) -> Any:
"""执行指定插件"""
plugin = cls.get_plugin(name)
if not plugin:
raise ValueError(f"未找到插件: {name}")
print(f"执行插件: {name}")
return plugin.execute(*args, **kwargs)
@classmethod
def load_plugins_from_package(cls, package_name: str):
"""从包中自动加载插件"""
try:
package = importlib.import_module(package_name)
# 遍历包中的所有模块
for _, module_name, is_pkg in pkgutil.iter_modules(package.__path__):
if not is_pkg:
full_module_name = f"{package_name}.{module_name}"
importlib.import_module(full_module_name)
print(f"从包 '{package_name}' 加载插件完成")
except ImportError as e:
print(f"加载包 '{package_name}' 失败: {e}")
# 定义一些插件
@PluginRegistry.register("greeter")
class GreeterPlugin(Plugin):
"""问候插件"""
def execute(self, name: str = "World") -> str:
greeting = f"Hello, {name}!"
print(greeting)
return greeting
def get_version(self) -> str:
return "2.0.0"
@PluginRegistry.register("calculator")
class CalculatorPlugin(Plugin):
"""计算器插件"""
def execute(self, operation: str, a: float, b: float) -> float:
operations = {
"add": lambda x, y: x + y,
"subtract": lambda x, y: x - y,
"multiply": lambda x, y: x * y,
"divide": lambda x, y: x / y if y != 0 else float('inf')
}
if operation not in operations:
raise ValueError(f"不支持的操作: {operation}")
result = operations[operation](a, b)
print(f"{a}{operation}{b} = {result}")
return result
@PluginRegistry.register("uppercase")
class UppercasePlugin(Plugin):
"""大写转换插件"""
def execute(self, text: str) -> str:
result = text.upper()
print(f"大写转换: '{text}' -> '{result}'")
return result
@PluginRegistry.register() # 使用默认名称
class ReversePlugin(Plugin):
"""字符串反转插件"""
def execute(self, text: str) -> str:
result = text[::-1]
print(f"反转: '{text}' -> '{result}'")
return result
# 动态插件注册
def register_dynamic_plugin():
"""动态注册插件(例如从配置文件加载)"""
class DynamicPlugin(Plugin):
def __init__(self, name, func):
self._name = name
self._func = func
def execute(self, *args, **kwargs):
print(f"执行动态插件: {self._name}")
return self._func(*args, **kwargs)
def get_name(self):
return self._name
# 动态创建和注册插件
def custom_function(x):
return x * 2
# 动态注册
plugin_name = "dynamic_doubler"
PluginRegistry._plugins[plugin_name] = type(
"DynamicDoubler",
(Plugin,),
{
"execute": lambda self, x: custom_function(x),
"get_name": lambda self: plugin_name
}
)
print(f"动态注册插件: {plugin_name}")
# 使用示例
print("=== 插件系统演示 ===\n")
# 列出所有插件
print("已注册的插件:")
for plugin_name in PluginRegistry.list_plugins():
print(f" - {plugin_name}")
print("\n1. 执行问候插件:")
PluginRegistry.execute_plugin("greeter", "Python开发者")
print("\n2. 执行计算器插件:")
result = PluginRegistry.execute_plugin("calculator", "multiply", 6, 7)
print(f" 结果: {result}")
print("\n3. 执行大写插件:")
PluginRegistry.execute_plugin("uppercase", "hello world")
print("\n4. 执行反转插件:")
PluginRegistry.execute_plugin("reverseplugin", "Python") # 使用默认名称
print("\n5. 动态插件演示:")
register_dynamic_plugin()
# 执行动态插件
if "dynamic_doubler" in PluginRegistry.list_plugins():
result = PluginRegistry.execute_plugin("dynamic_doubler", 21)
print(f" 动态插件结果: {result}")
print("\n6. 获取插件信息:")
greeter = PluginRegistry.get_plugin("greeter")
if greeter:
print(f" 插件名称: {greeter.get_name()}")
print(f" 插件版本: {greeter.get_version()}")
print("\n7. 批量执行所有插件:")
all_plugins = PluginRegistry.get_all_plugins()
for name, plugin in all_plugins.items():
print(f" 执行 {name}: ", end="")
if name == "greeter":
plugin.execute("All Plugins")
elif name == "calculator":
plugin.execute("add", 10, 20)
elif name == "uppercase":
plugin.execute("test")
elif name == "reverseplugin":
plugin.execute("abcd")
elif name == "dynamic_doubler":
result = plugin.execute(15)
print(f"结果: {result}", end="")
print()
# 模拟从包加载插件
print("\n8. 模拟从包加载插件:")
print(" 调用 PluginRegistry.load_plugins_from_package('my_plugins')")
print(" (假设my_plugins包中有更多插件)")
这种模式为框架、CLI和可扩展系统提供动力。
写在最后
大多数开发者认为架构是"以后学"的东西。这就是为什么你会在凌晨3点重写整个模块。
真相是什么?
模式是可扩展系统和慢动作灾难之间的区别。
当你开始有意识地使用它们时,你的代码会从"勉强能用"跃升到"这维护性也太好了吧"。
如果你已经用Python编码多年,并希望提升水平——这就是真正成长的开始。
实践建议
- 从一个小模式开始:不要试图一次性应用所有模式。先从依赖注入或策略模式开始
- 重构旧代码:选择一个小型旧项目,尝试用这些模式重构部分代码
- 代码审查中关注模式:在团队代码审查中,讨论设计模式的使用
- 学习经典设计模式:深入研究GoF的23种设计模式,理解其背后的原则
你在项目中用过哪些设计模式?有没有遇到过特别适合或特别不适合使用设计模式的场景?欢迎在评论区分享你的经验和思考!
