事件驱动架构以系统中的事件或通知为核心,能有效解耦代码库的各个部分,并便于系统扩展以增强功能或提升性能。这种架构使得引入新变更时影响最小化,是Python开发中常用的模式。
在最简单场景下,事件驱动架构处理离散事件,例如当特定条件变化时触发操作或发送警报。其中,信息生产者负责发送事件,消费者则接收并响应这些事件。典型实现方式包括使用消息代理和不用消息代理两种。
使用消息代理
消息代理充当数据传输的中间件。生产者将数据(即消息)发布到代理上的特定主题,主题作为唯一标识符区分不同消息通道。消费者使用相同标识符订阅主题,代理将消息转发给所有订阅者。这种系统常称为发布者/订阅者(pub/sub)模式。以下以PyPubSub应用为例:
PyPubSub适用于单进程应用程序,无法跨进程或系统通信(其他库可提供此功能)。通过pip install pypubsub安装,其核心概念包括:
- 主题:消息类别或地址,用点分隔字符串表示(如'weather.update')。
- 消息:发布者发送的数据,可以是任何Python对象。
- 发布者:发送消息的代码,指定主题和数据。
- 订阅者:接收消息的代码,注册函数到特定主题。
- 监听器:订阅者注册的函数本身。
from pubsub import pub
def news_listener(headline, content): # 订阅者1
print(f"【新闻快讯】{headline}")
print(f"内容:{content}")
def sports_news_listener(score): # 订阅者2
print(f"【体育新闻】主队得分:{score}")
def main():
pub.subscribe(news_listener, 'news')
pub.subscribe(sports_news_listener, 'sports')
pub.sendMessage('news', headline="AI取得新突破", content="研究人员发现了...")
pub.sendMessage('sports', score=108)
if __name__ == "__main__":
main()
观察者模式
观察者模式定义了一对多的依赖关系,让多个观察者监听同一主题对象。当主题状态变化时,所有观察者自动更新。在算法与数据结构中,这种模式常用于处理对象间状态同步。
以下以人员信息管理为例:某系统中,财务部门关注工资变化,领导机关关注学历更新。当员工信息变更时,系统自动通知相关部门。
import time
class Observe: # 观察者抽象类
def __init__(self, sub: list[str]):
self.sub = sub
def notify(self):
pass
class Subject: # 被观察对象
def __init__(self, name: str, age: int, education: str, salary: int, observes: list[Observe] = []):
self.name = name
self._age = age
self._education = education
self._salary = salary
self.observes = observes
def add_observe(self, observe: Observe) -> None:
self.observes.append(observe)
def remove_observe(self, observe: Observe) -> None:
self.observes.remove(observe)
def notify_observes(self, sub: str) -> None:
for observe in self.observes:
if sub in observe.sub:
observe.notify()
@property
def age(self) -> int:
return self._age
@age.setter
def age(self, value: int) -> None:
self._age = value
self.notify_observes("age")
@property
def salary(self) -> int:
return self._salary
@salary.setter
def salary(self, value: int) -> None:
self._salary = value
self.notify_observes("salary")
@property
def education(self) -> str:
return self._education
@education.setter
def education(self, value: str) -> None:
self._education = value
self.notify_observes("education")
class observe_salary(Observe): # 工资观察者
def __init__(self, subject: Subject):
super().__init__(["salary"])
self.subject = subject
def notify(self):
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Salary changed, new salary is: {self.subject.salary}元/月")
class observe_education(Observe): # 学历、年龄观察者
def __init__(self, subject: Subject, old_age: int, old_education: str) -> None:
super().__init__(["education", "age"])
self.subject = subject
self.old_age = old_age
self.old_education = old_education
def notify(self):
if self.old_education != self.subject.education:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Education changed, new education is: {self.subject.education}")
self.old_education = self.subject.education
if self.old_age != self.subject.age:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} {self.subject.name} Age changed, it is: {self.subject.age}岁")
self.old_age = self.subject.age
# 测试
subject = Subject("张三", 25, "本科", 50000)
subject.add_observe(observe_salary(subject))
subject.add_observe(observe_education(subject, subject.age, subject.education))
subject.salary = 60000
time.sleep(3)
subject.education = "硕士"
subject.age = 27
time.sleep(3)
subject.education = "博士"
事件流
对于简单离散事件,消息代理和观察者模式足够有效。但当系统需要处理连续事件流时,需采用响应式编程模型,定义事件处理工作流。
响应式编程以事件流为核心架构风格。数据源作为流生产者,多个观察者串联处理数据变化。ReactiveX的Python实现RxPy(pip install rx)提供以下核心概念:
- Observable:数据流源头,可发出多个数据或错误。
- Observer:处理数据,包含on_next、on_error、on_completed方法。
- Operators:对流进行转换、过滤等操作。
- Subscription:表示监听关系,可取消以释放资源。
以下示例监听用户输入:
import rx
from rx import operators as op
import threading
import time
def listen_for_input(observer, scheduler):
"""模拟异步输入监听器"""
def run():
while True:
user_input = input("Enter something (or 'quit' to exit): ")
if user_input.lower() == 'quit':
observer.on_completed()
break
else:
observer.on_next(user_input)
thread = threading.Thread(target=run)
thread.start()
input_observable = rx.create(listen_for_input)
subscription = input_observable.pipe(
op.map(lambda text: f"你输入了: {text}"),
op.throttle_first(2) # 2秒内只接受第一个输入
).subscribe(
on_next=lambda msg: print(msg),
on_completed=lambda: print("再见!")
)
# 实际应用中可取消订阅
# subscription.dispose()