找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

848

积分

0

好友

110

主题
发表于 3 小时前 | 查看: 2| 回复: 0

和云朵君一起学习数据分析与挖掘

大家好,我是云朵君!

架构决定了代码的寿命,而模式决定了架构的质量

你是否曾回头审视自己半年前写的Python项目,然后心想:
这堆垃圾是谁写的?哦等等…好像是我自己。

别担心,每个真正的开发者都有这样的时刻。问题不在于你的编程技巧,而在于你的架构。而架构,正是90%技术债务的诞生地。

经过四年多构建从“可爱小脚本”到“为什么这东西要吃12GB内存?”的系统后,我学到了一个痛苦的教训:
语法拯救不了项目,模式才能。

今天,云朵君要分享给你10个Python模式,在构建任何像样的“大项目”之前,你应该把它们刻在脑子里。

这些不是你在每个YouTube教程里看到的基础模式。这些是我多么希望有人在我写下4万行意大利面条式代码之前,就甩在我脸上的模式。

引言:为什么你需要这些模式?

想象一下这个场景:你的项目从简单的脚本成长为有多个模块、需要团队协作的完整应用。突然之间:

  • 添加新功能需要修改5个不同的文件  
  • 测试变得异常困难  
  • 简单的修改引发意想不到的连锁反应  
  • 新同事需要一周才能理解你的代码结构  

这就是缺乏设计模式的代价。而下面的10个模式,正是解决这些问题的钥匙。

1. 依赖注入:告别硬编码的紧耦合

大多数开发者不用依赖注入,因为他们觉得“Python是动态语言嘛”。确实…直到你尝试测试一个自己实例化数据库连接的类。

为什么重要?

  • 实现真正的解耦  
  • 便于单元测试(可以轻松模拟依赖)  
  • 更容易替换实现而不重写逻辑  
# 错误的做法:硬编码依赖
class BadUserManager:
    def __init__(self):
        self.email_service = EmailService()  # 硬编码!

    def register(self, user):
        # 业务逻辑
        self.email_service.send(user, "Welcome!")

# 正确的做法:依赖注入
class EmailService:
    def send(self, to, message):
        print(f"发送邮件给 {to}: {message}")

class UserManager:
    def __init__(self, email_service):  # 依赖作为参数传入
        self.email_service = email_service

    def register(self, user):
        # 业务逻辑
        self.email_service.send(user, "欢迎!")

# 使用
email_service = EmailService()
manager = UserManager(email_service)  # 注入依赖
manager.register("test@example.com")

# 测试时可以轻松模拟
class MockEmailService:
    def send(self, to, message):
        print(f"[测试] 模拟发送给 {to}")

test_manager = UserManager(MockEmailService())
test_manager.register("test@example.com")

实战技巧:在FastAPI或Django中,依赖注入常用于数据库会话、配置等。试试看,你的测试代码会变得多么干净!

2. 策略模式:告别if/else的丛林

如果你的代码有一个巨大的决策树,恭喜你——你创造了一个怪物。策略模式用清晰、可互换的行为替换了这团乱麻。

# 常见反模式:if/else地狱
def process_payment(amount, payment_method):
    if payment_method == "paypal":
        # 处理PayPal支付
        pass
    elif payment_method == "stripe":
        # 处理Stripe支付
        pass
    elif payment_method == "alipay":
        # 处理支付宝支付
        pass
    # ... 更多elif

# 优雅的解决方案:策略模式
from abc import ABC, abstractmethod

class PaymentStrategy(ABC):
    """支付策略抽象基类"""
    @abstractmethod
    def pay(self, amount):
        pass

class PayPalPayment(PaymentStrategy):
    def pay(self, amount):
        print(f"通过PayPal支付 {amount} 元")
        # 实际的PayPal API调用
        return True

class StripePayment(PaymentStrategy):
    def pay(self, amount):
        print(f"通过Stripe支付 {amount} 元")
        # 实际的Stripe API调用
        return True

class AlipayPayment(PaymentStrategy):
    def pay(self, amount):
        print(f"通过支付宝支付 {amount} 元")
        # 实际的支付宝API调用
        return True

class PaymentProcessor:
    def __init__(self):
        self.strategies = {
            "paypal": PayPalPayment(),
            "stripe": StripePayment(),
            "alipay": AlipayPayment()
        }

    def process(self, method, amount):
        strategy = self.strategies.get(method)
        if not strategy:
            raise ValueError(f"不支持的支付方式: {method}")
        return strategy.pay(amount)

# 使用
processor = PaymentProcessor()
processor.process("paypal", 100)
processor.process("alipay", 200)

# 添加新支付方式非常容易
class WeChatPayment(PaymentStrategy):
    def pay(self, amount):
        print(f"通过微信支付 {amount} 元")
        return True

processor.strategies["wechat"] = WeChatPayment()

设计优势:新增支付方式只需添加新类,无需修改现有代码——这符合开闭原则

3. 建造者模式:当构造函数变得臃肿时

曾经有一个类有九个可选参数吗?是的。那不是构造函数——那是人质劫持现场。

# 问题:太多参数的构造函数
class User:
    def __init__(self, name, email=None, phone=None, age=None,
                 address=None, city=None, country=None, 
                 is_active=True, is_admin=False):
        # 初始化所有属性...
        pass

# 使用起来很痛苦
user = User(
    name="张三",
    email="zhangsan@example.com",
    phone="13800138000",
    age=25,
    address="某街道123号",
    city="北京",
    country="中国",
    is_active=True,
    is_admin=False
)

# 解决方案:建造者模式
class QueryBuilder:
    def __init__(self, table="users"):
        self.table = table
        self._select_fields = ["*"]
        self._where_conditions = []
        self._order_by_fields = []
        self._limit_value = None

    def select(self, *fields):
        self._select_fields = fields
        return self

    def where(self, condition):
        self._where_conditions.append(condition)
        return self

    def order_by(self, field, descending=False):
        direction = "DESC" if descending else "ASC"
        self._order_by_fields.append(f"{field} {direction}")
        return self

    def limit(self, count):
        self._limit_value = count
        return self

    def build(self):
        # 构建SELECT部分
        select_clause = f"SELECT {', '.join(self._select_fields)} FROM {self.table}"

        # 构建WHERE部分
        where_clause = ""
        if self._where_conditions:
            where_clause = " WHERE " + " AND ".join(self._where_conditions)

        # 构建ORDER BY部分
        order_clause = ""
        if self._order_by_fields:
            order_clause = " ORDER BY " + ", ".join(self._order_by_fields)

        # 构建LIMIT部分
        limit_clause = ""
        if self._limit_value:
            limit_clause = f" LIMIT {self._limit_value}"

        return select_clause + where_clause + order_clause + limit_clause

# 流畅的API调用
query = (
    QueryBuilder("users")
    .select("id", "name", "email")
    .where("age > 18")
    .where("is_active = TRUE")
    .order_by("created_at", descending=True)
    .limit(10)
    .build()
)

print(query)
# 输出: SELECT id, name, email FROM users WHERE age > 18 AND is_active = TRUE ORDER BY created_at DESC LIMIT 10

现代应用SQLAlchemy的查询接口、Django的Q对象、Pydantic的模型配置都使用了类似的建造者模式。

4. 事件驱动模式:当应用需要应对高并发时

大型系统崩溃往往因为一切依赖于一切。解决方案是:事件,而不是链式调用

class EventBus:
    """事件总线 - 组件通信的中枢"""
    def __init__(self):
        self._subscribers = {}

    def subscribe(self, event_type, callback):
        """订阅事件"""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []

        if callback not in self._subscribers[event_type]:
            self._subscribers[event_type].append(callback)

    def unsubscribe(self, event_type, callback):
        """取消订阅"""
        if event_type in self._subscribers:
            self._subscribers[event_type].remove(callback)

    def publish(self, event_type, data=None):
        """发布事件"""
        if event_type in self._subscribers:
            for callback in self._subscribers[event_type]:
                try:
                    callback(data)
                except Exception as e:
                    print(f"事件处理出错: {e}")

    def get_subscriber_count(self, event_type):
        """获取订阅者数量"""
        return len(self._subscribers.get(event_type, []))

# 定义事件类型
USER_REGISTERED = "user_registered"
ORDER_CREATED = "order_created"
PAYMENT_SUCCESS = "payment_success"

# 创建事件总线实例
bus = EventBus()

# 事件处理器
def send_welcome_email(user_data):
    print(f"[邮件服务] 发送欢迎邮件给 {user_data.get('email')}")

def init_user_storage(user_data):
    print(f"[存储服务] 为用户 {user_data.get('username')} 初始化存储空间")

def log_user_registration(user_data):
    print(f"[日志服务] 用户注册: {user_data}")

# 订阅事件
bus.subscribe(USER_REGISTERED, send_welcome_email)
bus.subscribe(USER_REGISTERED, init_user_storage)
bus.subscribe(USER_REGISTERED, log_user_registration)

# 发布事件
new_user = {
    "id": 123,
    "username": "zhangsan",
    "email": "zhangsan@example.com",
    "registered_at": "2024-01-01 10:00:00"
}

print("用户注册事件触发...")
bus.publish(USER_REGISTERED, new_user)
print(f"事件处理完成,共有{bus.get_subscriber_count(USER_REGISTERED)}个订阅者")

应用场景:微服务通信、插件系统、GUI应用的事件处理。这种模式比治疗更能解耦有毒的关系。

5. 仓储模式:阻止SQL泄漏到业务逻辑中

如果SQL查询生活在你的业务逻辑中,那么你正在邀请混乱。

import sqlite3
from contextlib import contextmanager
from typing import List, Optional, Dict, Any

@contextmanager
def get_db_connection():
    """数据库连接上下文管理器"""
    conn = sqlite3.connect(":memory:")  # 使用内存数据库示例
    conn.row_factory = sqlite3.Row  # 返回字典-like的行
    try:
        yield conn
    finally:
        conn.close()

class UserRepository:
    """用户数据仓储"""
    def __init__(self, db_connection):
        self.db = db_connection
        self._create_table()

    def _create_table(self):
        """创建用户表"""
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT NOT NULL UNIQUE,
                email TEXT NOT NULL UNIQUE,
                age INTEGER,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.db.commit()

    def get_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """根据ID获取用户"""
        cursor = self.db.execute(
            "SELECT * FROM users WHERE id = ?",
            (user_id,)
        )
        row = cursor.fetchone()
        return dict(row) if row else None

    def get_by_email(self, email: str) -> Optional[Dict[str, Any]]:
        """根据邮箱获取用户"""
        cursor = self.db.execute(
            "SELECT * FROM users WHERE email = ?",
            (email,)
        )
        row = cursor.fetchone()
        return dict(row) if row else None

    def get_all(self, limit: int = 100) -> List[Dict[str, Any]]:
        """获取所有用户"""
        cursor = self.db.execute(
            "SELECT * FROM users LIMIT ?",
            (limit,)
        )
        return [dict(row) for row in cursor.fetchall()]

    def add(self, username: str, email: str, age: int = None) -> int:
        """添加用户"""
        cursor = self.db.execute(
            "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
            (username, email, age)
        )
        self.db.commit()
        return cursor.lastrowid

    def update(self, user_id: int, **kwargs) -> bool:
        """更新用户信息"""
        if not kwargs:
            return False

        set_clause = ", ".join([f"{k} = ?" for k in kwargs.keys()])
        values = list(kwargs.values())
        values.append(user_id)

        self.db.execute(
            f"UPDATE users SET {set_clause} WHERE id = ?",
            values
        )
        self.db.commit()
        return True

    def delete(self, user_id: int) -> bool:
        """删除用户"""
        cursor = self.db.execute(
            "DELETE FROM users WHERE id = ?",
            (user_id,)
        )
        self.db.commit()
        return cursor.rowcount > 0

# 使用示例
with get_db_connection() as conn:
    repo = UserRepository(conn)

    # 添加用户
    user_id = repo.add("zhangsan", "zhangsan@example.com", 25)
    print(f"添加用户成功,ID: {user_id}")

    # 查询用户
    user = repo.get_by_id(user_id)
    print(f"查询用户: {user}")

    # 更新用户
    repo.update(user_id, age=26, email="new_email@example.com")

    # 再次查询
    updated_user = repo.get_by_id(user_id)
    print(f"更新后用户: {updated_user}")

    # 获取所有用户
    all_users = repo.get_all()
    print(f"总用户数: {len(all_users)}")

架构优势:更换数据库?只需更新一个文件。这就是架构的力量。

6. 映射器模式:你的数据 ≠ 你的对象

将数据结构映射到Python对象可以给你一致性,并消除"字典末日"问题。

from datetime import datetime
from typing import Optional

class User:
    """领域模型:用户实体"""
    def __init__(self, id: int, username: str, email: str, 
                 age: Optional[int] = None, created_at: Optional[datetime] = None):
        self.id = id
        self.username = username
        self.email = email
        self.age = age
        self.created_at = created_at or datetime.now()

    def __repr__(self):
        return f"User(id={self.id}, username='{self.username}', email='{self.email}')"

    def is_adult(self) -> bool:
        return self.age is not None and self.age >= 18

    def get_display_name(self) -> str:
        return f"{self.username} ({self.email})"

class UserMapper:
    """用户映射器:在领域模型和数据模型之间转换"""

    @staticmethod
    def from_dict(data: dict) -> User:
        """从字典创建User对象"""
        return User(
            id=data.get("id"),
            username=data["username"],
            email=data["email"],
            age=data.get("age"),
            created_at=datetime.fromisoformat(data["created_at"]) if "created_at" in data else None
        )

    @staticmethod
    def from_database_row(row) -> User:
        """从数据库行创建User对象"""
        return User(
            id=row["id"],
            username=row["username"],
            email=row["email"],
            age=row["age"],
            created_at=datetime.fromisoformat(row["created_at"]) if row["created_at"] else None
        )

    @staticmethod
    def to_dict(user: User) -> dict:
        """将User对象转为字典"""
        return {
            "id": user.id,
            "username": user.username,
            "email": user.email,
            "age": user.age,
            "created_at": user.created_at.isoformat() if user.created_at else None,
            "is_adult": user.is_adult(),
            "display_name": user.get_display_name()
        }

    @staticmethod
    def to_database_params(user: User) -> tuple:
        """将User对象转为数据库参数"""
        return (
            user.username,
            user.email,
            user.age,
            user.created_at.isoformat() if user.created_at else None
        )

# 示例:从API JSON到领域对象
api_response = {
    "id": 123,
    "username": "zhangsan",
    "email": "zhangsan@example.com",
    "age": 25,
    "created_at": "2024-01-01T10:00:00"
}

# 映射到领域对象
user = UserMapper.from_dict(api_response)
print(f"领域对象: {user}")
print(f"是否成人: {user.is_adult()}")
print(f"显示名称: {user.get_display_name()}")

# 映射回字典
user_dict = UserMapper.to_dict(user)
print(f"回传字典: {user_dict}")

# 示例:从数据库到领域对象
class MockDBRow:
    """模拟数据库行"""
    def __init__(self):
        self.data = {
            "id": 456,
            "username": "lisi",
            "email": "lisi@example.com",
            "age": 17,
            "created_at": "2024-01-02T09:00:00"
        }

    def __getitem__(self, key):
        return self.data[key]

db_row = MockDBRow()
user_from_db = UserMapper.from_database_row(db_row)
print(f"\n从数据库创建的用户: {user_from_db}")
print(f"是否成人: {user_from_db.is_adult()}")

API给你JSON,你的代码值得拥有结构。

7. 管道模式:当任务需要流畅执行时

完美适用于机器学习预处理、ETL,或任何你的逻辑有多个阶段的情况。

from abc import ABC, abstractmethod
from typing import Any, List

class PipelineStep(ABC):
    """管道步骤抽象基类"""
    @abstractmethod
    def process(self, data: Any) -> Any:
        pass

    def __call__(self, data: Any) -> Any:
        return self.process(data)

class TextCleanStep(PipelineStep):
    """文本清洗步骤"""
    def process(self, text: str) -> str:
        # 移除多余空格
        text = ' '.join(text.split())
        # 转换为小写
        text = text.lower()
        # 移除特殊字符(简单示例)
        text = ''.join(c for c in text if c.isalnum() or c.isspace())
        return text

class TokenizeStep(PipelineStep):
    """分词步骤"""
    def __init__(self, tokenizer=None):
        self.tokenizer = tokenizer or str.split

    def process(self, text: str) -> List[str]:
        return self.tokenizer(text)

class StopwordRemovalStep(PipelineStep):
    """停用词移除步骤"""
    def __init__(self, stopwords=None):
        self.stopwords = stopwords or {"a", "an", "the", "and", "or", "but", "in", "on", "at"}

    def process(self, tokens: List[str]) -> List[str]:
        return [token for token in tokens if token not in self.stopwords]

class StemmingStep(PipelineStep):
    """词干提取步骤(简化版)"""
    def process(self, tokens: List[str]) -> List[str]:
        # 简化版的词干提取
        stemmed = []
        for token in tokens:
            if token.endswith("ing"):
                token = token[:-3]
            elif token.endswith("ed"):
                token = token[:-2]
            elif token.endswith("s"):
                token = token[:-1]
            stemmed.append(token)
        return stemmed

class Pipeline:
    """管道:连接多个处理步骤"""
    def __init__(self, steps: List[PipelineStep] = None):
        self.steps = steps or []

    def add_step(self, step: PipelineStep):
        """添加处理步骤"""
        self.steps.append(step)
        return self  # 支持链式调用

    def process(self, data: Any) -> Any:
        """处理数据"""
        result = data
        for step in self.steps:
            print(f"执行步骤: {step.__class__.__name__}")
            result = step.process(result)
            print(f"当前结果: {result}")
        return result

    def __call__(self, data: Any) -> Any:
        return self.process(data)

# 构建文本处理管道
text_pipeline = Pipeline()
text_pipeline.add_step(TextCleanStep())
text_pipeline.add_step(TokenizeStep())
text_pipeline.add_step(StopwordRemovalStep())
text_pipeline.add_step(StemmingStep())

# 处理文本
sample_text = "  I am RUNNING in the park and playing with dogs!  "
print("原始文本:", sample_text)
print("\n开始管道处理...")
final_result = text_pipeline.process(sample_text)

print(f"\n最终结果: {final_result}")

# 扩展:可以轻松添加新步骤
class SpellCheckStep(PipelineStep):
    """拼写检查步骤(示例)"""
    def process(self, tokens: List[str]) -> List[str]:
        # 这里可以实现实际的拼写检查逻辑
        corrected = []
        for token in tokens:
            # 简单示例:修正常见拼写错误
            if token == "runn":
                token = "run"
            corrected.append(token)
        return corrected

# 添加新步骤到现有管道
text_pipeline.add_step(SpellCheckStep())
new_result = text_pipeline.process("I am runn in the parc")
print(f"\n包含拼写检查的结果: {new_result}")

管道保持你的大脑——和你的项目——整洁有序。

8. 命令模式:撤销、重做和"再来一次"

如果你的项目需要可逆操作,这是你的救星。

from abc import ABC, abstractmethod
from typing import List, Dict, Any

class Command(ABC):
    """命令抽象基类"""
    @abstractmethod
    def execute(self) -> Any:
        """执行命令"""
        pass

    @abstractmethod
    def undo(self) -> Any:
        """撤销命令"""
        pass

    def __str__(self):
        return self.__class__.__name__

class TextEditor:
    """文本编辑器(接收者)"""
    def __init__(self):
        self.text = ""
        self.history: List[str] = []

    def add_text(self, new_text: str):
        """添加文本"""
        self.history.append(self.text)
        self.text += new_text
        print(f"添加文本: '{new_text}'")
        print(f"当前文本: '{self.text}'")

    def delete_last_word(self):
        """删除最后一个单词"""
        if self.text:
            self.history.append(self.text)
            words = self.text.split()
            if words:
                last_word = words[-1]
                self.text = ' '.join(words[:-1])
                print(f"删除最后一个单词: '{last_word}'")
                print(f"当前文本: '{self.text}'")
                return last_word
        return ""

    def clear_text(self):
        """清空文本"""
        self.history.append(self.text)
        self.text = ""
        print("清空文本")

    def restore_from_history(self):
        """从历史恢复"""
        if self.history:
            self.text = self.history.pop()
            print(f"恢复到: '{self.text}'")

class AddTextCommand(Command):
    """添加文本命令"""
    def __init__(self, editor: TextEditor, text: str):
        self.editor = editor
        self.text = text
        self.previous_text = ""

    def execute(self) -> None:
        self.previous_text = self.editor.text
        self.editor.add_text(self.text)

    def undo(self) -> None:
        self.editor.text = self.previous_text
        print(f"撤销添加文本,恢复为: '{self.editor.text}'")

class DeleteLastWordCommand(Command):
    """删除最后一个单词命令"""
    def __init__(self, editor: TextEditor):
        self.editor = editor
        self.deleted_word = ""
        self.previous_text = ""

    def execute(self) -> None:
        self.previous_text = self.editor.text
        self.deleted_word = self.editor.delete_last_word()

    def undo(self) -> None:
        self.editor.text = self.previous_text
        print(f"撤销删除,恢复为: '{self.editor.text}'")

class ClearTextCommand(Command):
    """清空文本命令"""
    def __init__(self, editor: TextEditor):
        self.editor = editor
        self.previous_text = ""

    def execute(self) -> None:
        self.previous_text = self.editor.text
        self.editor.clear_text()

    def undo(self) -> None:
        self.editor.text = self.previous_text
        print(f"撤销清空,恢复为: '{self.editor.text}'")

class CommandHistory:
    """命令历史管理器"""
    def __init__(self):
        self.history: List[Command] = []
        self.undo_history: List[Command] = []

    def execute_command(self, command: Command):
        """执行命令并记录"""
        command.execute()
        self.history.append(command)
        self.undo_history.clear()  # 新的执行清除重做历史
        print(f"执行命令: {command}")

    def undo(self):
        """撤销上一个命令"""
        if self.history:
            command = self.history.pop()
            command.undo()
            self.undo_history.append(command)
            print(f"撤销命令: {command}")
        else:
            print("没有可撤销的命令")

    def redo(self):
        """重做上一个撤销的命令"""
        if self.undo_history:
            command = self.undo_history.pop()
            command.execute()
            self.history.append(command)
            print(f"重做命令: {command}")
        else:
            print("没有可重做的命令")

    def get_history(self):
        """获取命令历史"""
        return [str(cmd) for cmd in self.history]

# 使用示例
editor = TextEditor()
history = CommandHistory()

print("=== 文本编辑器演示 ===\n")

# 执行一系列命令
add_hello = AddTextCommand(editor, "Hello ")
history.execute_command(add_hello)

add_world = AddTextCommand(editor, "World ")
history.execute_command(add_world)

add_python = AddTextCommand(editor, "Python ")
history.execute_command(add_python)

print(f"\n当前文本: '{editor.text}'")
print(f"命令历史: {history.get_history()}")

# 撤销操作
print("\n--- 执行撤销 ---")
history.undo()
print(f"撤销后文本: '{editor.text}'")

history.undo()
print(f"再撤销后文本: '{editor.text}'")

# 重做操作
print("\n--- 执行重做 ---")
history.redo()
print(f"重做后文本: '{editor.text}'")

# 执行删除命令
print("\n--- 执行删除 ---")
delete_cmd = DeleteLastWordCommand(editor)
history.execute_command(delete_cmd)

# 执行清空命令
print("\n--- 执行清空 ---")
clear_cmd = ClearTextCommand(editor)
history.execute_command(clear_cmd)

# 多次撤销
print("\n--- 多次撤销 ---")
for _ in range(3):
    history.undo()

print(f"\n最终文本: '{editor.text}'")
print(f"最终历史: {history.get_history()}")

UI应用、CLI工具、编辑器——这种模式无处不在。

9. 规范模式:不会腐烂的过滤逻辑

当过滤器变得复杂时,规范模式保持其优雅。

from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime, timedelta

class Specification(ABC):
    """规范抽象基类"""
    @abstractmethod
    def is_satisfied(self, item: Dict[str, Any]) -> bool:
        """检查项是否满足规范"""
        pass

    def __and__(self, other: 'Specification') -> 'AndSpecification':
        """与操作"""
        return AndSpecification(self, other)

    def __or__(self, other: 'Specification') -> 'OrSpecification':
        """或操作"""
        return OrSpecification(self, other)

    def __invert__(self) -> 'NotSpecification':
        """非操作"""
        return NotSpecification(self)

    def __call__(self, item: Dict[str, Any]) -> bool:
        """使规范可调用"""
        return self.is_satisfied(item)

class AgeAboveSpecification(Specification):
    """年龄大于规范"""
    def __init__(self, min_age: int):
        self.min_age = min_age

    def is_satisfied(self, user: Dict[str, Any]) -> bool:
        age = user.get('age')
        return age is not None and age > self.min_age

    def __str__(self):
        return f"Age > {self.min_age}"

class HasEmailSpecification(Specification):
    """有邮箱规范"""
    def is_satisfied(self, user: Dict[str, Any]) -> bool:
        email = user.get('email')
        return bool(email and '@' in email)

    def __str__(self):
        return "Has Email"

class IsActiveSpecification(Specification):
    """活跃用户规范"""
    def __init__(self, days_threshold: int = 30):
        self.days_threshold = days_threshold

    def is_satisfied(self, user: Dict[str, Any]) -> bool:
        last_login = user.get('last_login')
        if not last_login:
            return False

        if isinstance(last_login, str):
            last_login = datetime.fromisoformat(last_login)

        days_since_login = (datetime.now() - last_login).days
        return days_since_login <= self.days_threshold

    def __str__(self):
        return f"Active within {self.days_threshold} days"

class BalanceAboveSpecification(Specification):
    """余额大于规范"""
    def __init__(self, min_balance: float):
        self.min_balance = min_balance

    def is_satisfied(self, user: Dict[str, Any]) -> bool:
        balance = user.get('balance', 0)
        return balance >= self.min_balance

    def __str__(self):
        return f"Balance >= {self.min_balance}"

class AndSpecification(Specification):
    """与规范"""
    def __init__(self, *specs: Specification):
        self.specs = specs

    def is_satisfied(self, item: Dict[str, Any]) -> bool:
        return all(spec(item) for spec in self.specs)

    def __str__(self):
        return " AND ".join(str(spec) for spec in self.specs)

class OrSpecification(Specification):
    """或规范"""
    def __init__(self, *specs: Specification):
        self.specs = specs

    def is_satisfied(self, item: Dict[str, Any]) -> bool:
        return any(spec(item) for spec in self.specs)

    def __str__(self):
        return " OR ".join(str(spec) for spec in self.specs)

class NotSpecification(Specification):
    """非规范"""
    def __init__(self, spec: Specification):
        self.spec = spec

    def is_satisfied(self, item: Dict[str, Any]) -> bool:
        return not self.spec(item)

    def __str__(self):
        return f"NOT ({self.spec})"

class UserFilter:
    """用户过滤器"""
    @staticmethod
    def filter_users(users: List[Dict[str, Any]], spec: Specification) -> List[Dict[str, Any]]:
        """根据规范过滤用户"""
        return [user for user in users if spec.is_satisfied(user)]

    @staticmethod
    def count_users(users: List[Dict[str, Any]], spec: Specification) -> int:
        """统计满足规范的用户数"""
        return sum(1 for user in users if spec.is_satisfied(user))

# 示例数据
users = [
    {"id": 1, "name": "张三", "age": 25, "email": "zhangsan@example.com", "balance": 1000.0, "last_login": "2024-01-15T10:00:00", "is_active": True},
    {"id": 2, "name": "李四", "age": 17, "email": "lisi@example.com", "balance": 500.0, "last_login": "2023-11-01T09:00:00", "is_active": True},
    {"id": 3, "name": "王五", "age": 30, "email": "", "balance": 2000.0, "last_login": "2024-01-20T14:00:00", "is_active": False},
    {"id": 4, "name": "赵六", "age": 22, "email": "zhaoliu@example.com", "balance": 300.0, "last_login": "2024-01-18T16:00:00", "is_active": True},
    {"id": 5, "name": "孙七", "age": 35, "email": "sunqi@example.com", "balance": 1500.0, "last_login": "2023-12-01T08:00:00", "is_active": True},
]

print("=== 用户过滤系统 ===\n")

# 基本规范
adult_spec = AgeAboveSpecification(18)
has_email_spec = HasEmailSpecification()
active_spec = IsActiveSpecification(days_threshold=30)
rich_spec = BalanceAboveSpecification(1000)

print("1. 成年用户:")
adults = UserFilter.filter_users(users, adult_spec)
print(f"  规范: {adult_spec}")
print(f"  数量: {len(adults)}")
for user in adults:
    print(f"  - {user['name']} ({user['age']}岁)")

print("\n2. 有邮箱的活跃成年用户:")
complex_spec = adult_spec & has_email_spec & active_spec
filtered = UserFilter.filter_users(users, complex_spec)
print(f"  规范: {complex_spec}")
print(f"  数量: {len(filtered)}")
for user in filtered:
    print(f"  - {user['name']}")

print("\n3. 有钱或活跃的用户:")
rich_or_active = rich_spec | active_spec
filtered = UserFilter.filter_users(users, rich_or_active)
print(f"  规范: {rich_or_active}")
print(f"  数量: {len(filtered)}")
for user in filtered:
    print(f"  - {user['name']} (余额: {user['balance']}, 最近登录: {user['last_login'][:10]})")

print("\n4. 非活跃用户:")
not_active = ~active_spec
filtered = UserFilter.filter_users(users, not_active)
print(f"  规范: {not_active}")
print(f"  数量: {len(filtered)}")
for user in filtered:
    print(f"  - {user['name']} (最后登录: {user['last_login'][:10]})")

print("\n5. 动态构建的复杂查询:")
# 查找:成年、有邮箱、余额>500,且(是活跃用户或余额>1500)
dynamic_spec = (
    adult_spec &
    has_email_spec &
    BalanceAboveSpecification(500) &
    (active_spec | BalanceAboveSpecification(1500))
)
print(f"  规范: {dynamic_spec}")

count = UserFilter.count_users(users, dynamic_spec)
print(f"  满足条件的用户数: {count}")

filtered = UserFilter.filter_users(users, dynamic_spec)
for user in filtered:
    print(f"  - {user['name']}: 年龄{user['age']}, 余额{user['balance']}, 活跃{user['is_active']}")

向迷宫般的过滤逻辑说再见。

10. 注册表模式:动态插件的智能方式

如果你的"插件系统"使用if name == 'plugin_a':,那么你并没有插件系统。

from abc import ABC, abstractmethod
from typing import Dict, Any, Callable, Optional, Type
import importlib
import pkgutil

class Plugin(ABC):
    """插件基类"""
    @abstractmethod
    def execute(self, *args, **kwargs) -> Any:
        """执行插件"""
        pass

    def get_name(self) -> str:
        """获取插件名称"""
        return self.__class__.__name__

    def get_version(self) -> str:
        """获取插件版本"""
        return "1.0.0"

class PluginRegistry:
    """插件注册表"""
    _plugins: Dict[str, Type[Plugin]] = {}
    _instances: Dict[str, Plugin] = {}

    @classmethod
    def register(cls, name: Optional[str] = None):
        """注册插件装饰器"""
        def decorator(plugin_cls: Type[Plugin]) -> Type[Plugin]:
            plugin_name = name or plugin_cls.__name__.lower()
            if plugin_name in cls._plugins:
                raise ValueError(f"插件 '{plugin_name}' 已注册")

            cls._plugins[plugin_name] = plugin_cls
            print(f"注册插件: {plugin_name} -> {plugin_cls.__name__}")
            return plugin_cls
        return decorator

    @classmethod
    def get_plugin(cls, name: str) -> Optional[Plugin]:
        """获取插件实例"""
        if name not in cls._instances:
            if name in cls._plugins:
                cls._instances[name] = cls._plugins[name]()
            else:
                return None
        return cls._instances.get(name)

    @classmethod
    def get_all_plugins(cls) -> Dict[str, Plugin]:
        """获取所有插件实例"""
        # 确保所有插件都已实例化
        for name in cls._plugins:
            if name not in cls._instances:
                cls._instances[name] = cls._plugins[name]()
        return cls._instances.copy()

    @classmethod
    def list_plugins(cls) -> list:
        """列出所有注册的插件"""
        return list(cls._plugins.keys())

    @classmethod
    def execute_plugin(cls, name: str, *args, **kwargs) -> Any:
        """执行指定插件"""
        plugin = cls.get_plugin(name)
        if not plugin:
            raise ValueError(f"未找到插件: {name}")

        print(f"执行插件: {name}")
        return plugin.execute(*args, **kwargs)

    @classmethod
    def load_plugins_from_package(cls, package_name: str):
        """从包中自动加载插件"""
        try:
            package = importlib.import_module(package_name)

            # 遍历包中的所有模块
            for _, module_name, is_pkg in pkgutil.iter_modules(package.__path__):
                if not is_pkg:
                    full_module_name = f"{package_name}.{module_name}"
                    importlib.import_module(full_module_name)

            print(f"从包 '{package_name}' 加载插件完成")
        except ImportError as e:
            print(f"加载包 '{package_name}' 失败: {e}")

# 定义一些插件
@PluginRegistry.register("greeter")
class GreeterPlugin(Plugin):
    """问候插件"""
    def execute(self, name: str = "World") -> str:
        greeting = f"Hello, {name}!"
        print(greeting)
        return greeting

    def get_version(self) -> str:
        return "2.0.0"

@PluginRegistry.register("calculator")
class CalculatorPlugin(Plugin):
    """计算器插件"""
    def execute(self, operation: str, a: float, b: float) -> float:
        operations = {
            "add": lambda x, y: x + y,
            "subtract": lambda x, y: x - y,
            "multiply": lambda x, y: x * y,
            "divide": lambda x, y: x / y if y != 0 else float('inf')
        }

        if operation not in operations:
            raise ValueError(f"不支持的操作: {operation}")

        result = operations[operation](a, b)
        print(f"{a}{operation}{b} = {result}")
        return result

@PluginRegistry.register("uppercase")
class UppercasePlugin(Plugin):
    """大写转换插件"""
    def execute(self, text: str) -> str:
        result = text.upper()
        print(f"大写转换: '{text}' -> '{result}'")
        return result

@PluginRegistry.register()  # 使用默认名称
class ReversePlugin(Plugin):
    """字符串反转插件"""
    def execute(self, text: str) -> str:
        result = text[::-1]
        print(f"反转: '{text}' -> '{result}'")
        return result

# 动态插件注册
def register_dynamic_plugin():
    """动态注册插件(例如从配置文件加载)"""

    class DynamicPlugin(Plugin):
        def __init__(self, name, func):
            self._name = name
            self._func = func

        def execute(self, *args, **kwargs):
            print(f"执行动态插件: {self._name}")
            return self._func(*args, **kwargs)

        def get_name(self):
            return self._name

    # 动态创建和注册插件
    def custom_function(x):
        return x * 2

    # 动态注册
    plugin_name = "dynamic_doubler"
    PluginRegistry._plugins[plugin_name] = type(
        "DynamicDoubler",
        (Plugin,),
        {
            "execute": lambda self, x: custom_function(x),
            "get_name": lambda self: plugin_name
        }
    )
    print(f"动态注册插件: {plugin_name}")

# 使用示例
print("=== 插件系统演示 ===\n")

# 列出所有插件
print("已注册的插件:")
for plugin_name in PluginRegistry.list_plugins():
    print(f"  - {plugin_name}")

print("\n1. 执行问候插件:")
PluginRegistry.execute_plugin("greeter", "Python开发者")

print("\n2. 执行计算器插件:")
result = PluginRegistry.execute_plugin("calculator", "multiply", 6, 7)
print(f"   结果: {result}")

print("\n3. 执行大写插件:")
PluginRegistry.execute_plugin("uppercase", "hello world")

print("\n4. 执行反转插件:")
PluginRegistry.execute_plugin("reverseplugin", "Python")  # 使用默认名称

print("\n5. 动态插件演示:")
register_dynamic_plugin()

# 执行动态插件
if "dynamic_doubler" in PluginRegistry.list_plugins():
    result = PluginRegistry.execute_plugin("dynamic_doubler", 21)
    print(f"   动态插件结果: {result}")

print("\n6. 获取插件信息:")
greeter = PluginRegistry.get_plugin("greeter")
if greeter:
    print(f"   插件名称: {greeter.get_name()}")
    print(f"   插件版本: {greeter.get_version()}")

print("\n7. 批量执行所有插件:")
all_plugins = PluginRegistry.get_all_plugins()
for name, plugin in all_plugins.items():
    print(f"  执行 {name}: ", end="")
    if name == "greeter":
        plugin.execute("All Plugins")
    elif name == "calculator":
        plugin.execute("add", 10, 20)
    elif name == "uppercase":
        plugin.execute("test")
    elif name == "reverseplugin":
        plugin.execute("abcd")
    elif name == "dynamic_doubler":
        result = plugin.execute(15)
        print(f"结果: {result}", end="")
    print()

# 模拟从包加载插件
print("\n8. 模拟从包加载插件:")
print("   调用 PluginRegistry.load_plugins_from_package('my_plugins')")
print("   (假设my_plugins包中有更多插件)")

这种模式为框架、CLI和可扩展系统提供动力。

写在最后

大多数开发者认为架构是"以后学"的东西。这就是为什么你会在凌晨3点重写整个模块。

真相是什么?
模式是可扩展系统和慢动作灾难之间的区别。

当你开始有意识地使用它们时,你的代码会从"勉强能用"跃升到"这维护性也太好了吧"。

如果你已经用Python编码多年,并希望提升水平——这就是真正成长的开始。

实践建议

  1. 从一个小模式开始:不要试图一次性应用所有模式。先从依赖注入或策略模式开始  
  2. 重构旧代码:选择一个小型旧项目,尝试用这些模式重构部分代码  
  3. 代码审查中关注模式:在团队代码审查中,讨论设计模式的使用  
  4. 学习经典设计模式:深入研究GoF的23种设计模式,理解其背后的原则  

你在项目中用过哪些设计模式?有没有遇到过特别适合或特别不适合使用设计模式的场景?欢迎在评论区分享你的经验和思考!

卡通小丑从纸箱中探出头来




上一篇:嵌入式TCP Socket封装指南:简化网络编程,提升代码复用
下一篇:英伟达开源PersonaPlex:告别机翻腔,全双工语音AI让角色扮演“活”了
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-29 23:42 , Processed in 0.342633 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表