找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

225

积分

0

好友

29

主题
发表于 14 小时前 | 查看: 2| 回复: 0

事件驱动架构以系统中的事件或通知为核心,能有效解耦代码库的各个部分,并便于系统扩展以增强功能或提升性能。这种架构使得引入新变更时影响最小化,是Python开发中常用的模式。

在最简单场景下,事件驱动架构处理离散事件,例如当特定条件变化时触发操作或发送警报。其中,信息生产者负责发送事件,消费者则接收并响应这些事件。典型实现方式包括使用消息代理和不用消息代理两种。

使用消息代理

消息代理充当数据传输的中间件。生产者将数据(即消息)发布到代理上的特定主题,主题作为唯一标识符区分不同消息通道。消费者使用相同标识符订阅主题,代理将消息转发给所有订阅者。这种系统常称为发布者/订阅者(pub/sub)模式。以下以PyPubSub应用为例:

PyPubSub适用于单进程应用程序,无法跨进程或系统通信(其他库可提供此功能)。通过pip install pypubsub安装,其核心概念包括:

  • 主题:消息类别或地址,用点分隔字符串表示(如'weather.update')。
  • 消息:发布者发送的数据,可以是任何Python对象。
  • 发布者:发送消息的代码,指定主题和数据。
  • 订阅者:接收消息的代码,注册函数到特定主题。
  • 监听器:订阅者注册的函数本身。
from pubsub import pub

def news_listener(headline, content):  # 订阅者1
    print(f"【新闻快讯】{headline}")
    print(f"内容:{content}")

def sports_news_listener(score):  # 订阅者2
    print(f"【体育新闻】主队得分:{score}")

def main():
    pub.subscribe(news_listener, 'news')
    pub.subscribe(sports_news_listener, 'sports')

    pub.sendMessage('news', headline="AI取得新突破", content="研究人员发现了...")
    pub.sendMessage('sports', score=108)

if __name__ == "__main__":
    main()
观察者模式

观察者模式定义了一对多的依赖关系,让多个观察者监听同一主题对象。当主题状态变化时,所有观察者自动更新。在算法与数据结构中,这种模式常用于处理对象间状态同步。

以下以人员信息管理为例:某系统中,财务部门关注工资变化,领导机关关注学历更新。当员工信息变更时,系统自动通知相关部门。

import time

class Observe:  # 观察者抽象类
    def __init__(self, sub: list[str]):
        self.sub = sub
    def notify(self):
        pass

class Subject:  # 被观察对象
    def __init__(self, name: str, age: int, education: str, salary: int, observes: list[Observe] = []):
        self.name = name
        self._age = age
        self._education = education
        self._salary = salary
        self.observes = observes

    def add_observe(self, observe: Observe) -> None:
        self.observes.append(observe)

    def remove_observe(self, observe: Observe) -> None:
        self.observes.remove(observe)

    def notify_observes(self, sub: str) -> None:
        for observe in self.observes:
            if sub in observe.sub:
                observe.notify()

    @property
    def age(self) -> int:
        return self._age

    @age.setter
    def age(self, value: int) -> None:
        self._age = value
        self.notify_observes("age")

    @property
    def salary(self) -> int:
        return self._salary

    @salary.setter
    def salary(self, value: int) -> None:
        self._salary = value
        self.notify_observes("salary")

    @property
    def education(self) -> str:
        return self._education

    @education.setter
    def education(self, value: str) -> None:
        self._education = value
        self.notify_observes("education")

class observe_salary(Observe):  # 工资观察者
    def __init__(self, subject: Subject):
        super().__init__(["salary"])
        self.subject = subject
    def notify(self):
        print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Salary changed, new salary is: {self.subject.salary}元/月")

class observe_education(Observe):  # 学历、年龄观察者
    def __init__(self, subject: Subject, old_age: int, old_education: str) -> None:
        super().__init__(["education", "age"])
        self.subject = subject
        self.old_age = old_age
        self.old_education = old_education
    def notify(self):
        if self.old_education != self.subject.education:
            print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Education changed, new education is: {self.subject.education}")
            self.old_education = self.subject.education
        if self.old_age != self.subject.age:
            print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Age changed, it is: {self.subject.age}岁")
            self.old_age = self.subject.age

# 测试
subject = Subject("张三", 25, "本科", 50000)
subject.add_observe(observe_salary(subject))
subject.add_observe(observe_education(subject, subject.age, subject.education))
subject.salary = 60000
time.sleep(3)
subject.education = "硕士"
subject.age = 27
time.sleep(3)
subject.education = "博士"
事件流

对于简单离散事件,消息代理和观察者模式足够有效。但当系统需要处理连续事件流时,需采用响应式编程模型,定义事件处理工作流。

响应式编程以事件流为核心架构风格。数据源作为流生产者,多个观察者串联处理数据变化。ReactiveX的Python实现RxPy(pip install rx)提供以下核心概念:

  • Observable:数据流源头,可发出多个数据或错误。
  • Observer:处理数据,包含on_next、on_error、on_completed方法。
  • Operators:对流进行转换、过滤等操作。
  • Subscription:表示监听关系,可取消以释放资源。

以下示例监听用户输入:


import rx
from rx import operators as op
import threading
import time

def listen_for_input(observer, scheduler):
    """模拟异步输入监听器"""
    def run():
        while True:
            user_input = input("Enter something (or 'quit' to exit): ")
            if user_input.lower() == 'quit':
                observer.on_completed()
                break
            else:
                observer.on_next(user_input)
    thread = threading.Thread(target=run)
    thread.start()

input_observable = rx.create(listen_for_input)

subscription = input_observable.pipe(
    op.map(lambda text: f"你输入了: {text}"),
    op.throttle_first(2)  # 2秒内只接受第一个输入
).subscribe(
    on_next=lambda msg: print(msg),
    on_completed=lambda: print("再见!")
)

# 实际应用中可取消订阅
# subscription.dispose()
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-1 14:51 , Processed in 0.056299 second(s), 37 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表