>>> from env_helper import info; info()
页面更新时间: 2024-01-14 22:45:59
运行环境:
    Linux发行版本: Debian GNU/Linux 12 (bookworm)
    操作系统内核: Linux-6.1.0-16-amd64-x86_64-with-glibc2.36
    Python版本: 3.11.2

5.3. 用发布订阅模式实现松耦合

发布订阅模式(publish/subscribe或pub/sub)是一种编程模式,消息的发送者(发布者) 不会发送其消息给特定的接收者(订阅者), 而是将发布的消息分为不同的类别直接发布,并 不关注订阅者是谁。 而订阅者可以对一个或多个类别感兴趣.且只接收感兴趣的消息,并且 不关注是哪个发布者发布的消息。 这种发布者和订阅者的解耦可以允许更好的可扩放性和更 为动态的网络拓扑,故受到了大家的喜爱。

发布订阅模式的优点是发布者与订阅者松散的耦合,双方不需要知道对方的存在。 由于主题是被关注的,发布者和订阅者可以对系统拓扑毫无所知。 无论对方是否存在,发送者和 订阅者都可以继续正常操作。 要实现这个模式,就需要有一个中间代理人,在实现中一般被 称为Broker,它维护着发布者和订阅者的关系: 订阅者把感兴趣的主题告诉它,而发布者的 信息也通过它路由到各个订阅者处。简单的实现如下:

>>> from collections import defaultdict
>>>
>>> route_table = defaultdict(list)
>>>
>>> def sub(self,topic, callback):
>>>     if callback in route_table[topic]:
>>>         return
>>>     route_table[topic].append(callback)
>>> def pub(self, topic,*a,**kw):
>>>     for func in route_tabLe[topic]:
>>>         func(*a,**kw)

这个实现非常简单,直接放在一个叫Broker.py的模块中(这显然是单件),省去了各种 参数检测、优先处理的需求等,甚至没有取消订阅的函数,但它的确展现了发布订阅模式实 现的最基础的结构,它的应用代码也可以运行。

import Broker

def greeting(name):

print ('Hello, %s.'%name)

Broker.sub('greet',greeting) Broker.pub('greet','LaiYonghao') #输出

相对于这个简化版本,blinker和python-message两个模块的实现要完备得多e blinker已 经被用在了多个广受欢迎的项目上,比如flask和django ;而python-message则支持更多丰 富的特性。本节以python-message的使用为例,讲解发布订阅模式的应用场景。

安装python-message相当简单,通过pip安装就可以了。

然后简单验证一下。

import message def hello(name):

print ('hello, %s.'%name)

message.sub('greet', hello) message.pub('greet','lai')

运行输出如下:

hello, lai.

接下来用它解决一些实际问题。假定你给项目组开发了一个程序库foo.里面有一个非 常重要的函数——bar。

>>> def bar():
>>>     print ('Haha, Calling bar().')
>>>     do_sth()

这个函数如此重要,所以你给它加上了一行输出代码,用以输出日志。后来你的这个程 序库foo被大量使用了,一直运行得很好,直到又一个新项目拖你过去“救火”,因为出了 bug无法査出原因,怀疑是foo的问题。你査看了很久日志,都没有发现他们调用bar()的痕 迹,一问,原来他们是用logging的,标准输出在做Daemon的时候被重定向到了/dev/null。 在临时修改了输出重定向以后,找到了 bug所在,并解决了。然后你开始着手解决这个问 题。一开始你想在你的foo库中引人logging,但原来的项目又不用logging.你在程序库里 引人logging,但谁来初始化它呢?就算你引人了 logging,则你们的项目可能是用logging. getLogger(‘prjA’)获取logger,另一个项目可能是用logging.getLogger(‘prjB’),日后还有新项 目呢!一想到要兼容这么多项目你就头大了。忍痛割爱,把print语句给删除掉吧,又怕曰后 出了问题_己都找不到bug,那还不是自己加班己苦。这个时候,不妨让python-message 来帮你,轻松改一下bar()函数。

import message

LOG_MSG = ('log', 'foo') def bar():

message.pub(LOG_MSG , 'Haha, Calling bar().') do_sth ()

在已有的项目中,只需要在项目开始处加上这样的代码,继续把日志放到标准输出。

import message import foo def handle_foo__log_msg(txt):

print (txt)

message,sub(foo.LOG_MSG, handle_foo_log_msg)

而在那个使用logging的新项目中,则这样修改:

>>> def handler_foo_log_msg(txt):
>>>     import logging
>>>     logging.debug(txt)

甚至在一些不关注底层库的日志项目中.直接无视就可以了。通过message,可以轻松 获得库与应用之间的解耦,因为库关注的是要有日志,而不关注日志输出到哪里;应用关注 的是日志要统一放置.但不关注谁往B志文件中输出内容,这正与发布订阅模式的应用场景 不谋而合。

除了简单的sub()/pub()之外,python-message还支持取消订阅(unsub())和中止消息传递。

import message def hello(name):

print ('hello %s' %name) ctx = message.Context() ctx.discontinued = True return ctx

def hi(name):

print ('u cannrt c me.')

message.sub('greet', hello) message.sub('greet', hi) message.pub('greet', 'lai')

python-message利用回调函数的返回值来实现取消消息传递, 非常巧妙(读者可以思考 一下为什么能够利用回调确数的返冋值)。 在上面这个例子中,运行后是看不到”u can’t c me.”这一行输出的.因为消息在调用hello()后就中止传递了(Broker使用list对象存储冋调闲数就是为了保证次序)。

python-message是同步调用回调函数的,也就是说谁先sub谁就先被调用。 大部分情况 下这样已经能够满足大分需求,但有时需要后sub的函数先被调用, 这时message.sub函数 通过一个默认参数来支持的f只需要简单地在调用sub的时候加上front=True, 这个回调函 数将被插到所有之前已经sub的回调函数之前:sub(‘greet’,hello,front=True)。

订阅/发布模式是观察者模式的超集,它不关注消息是谁发布的,也不关注消息由谁处 理。 但有时候我们也希望某个自己的类的也能够更方便地订阅/发布消息, 也就是想退化为 观察者模式,python-message同样提供了支持。如以下代码:

from message import observable def greet(people):

print ('hello. %s.' %people.name)

@observable class Foo(object):

def __init__(self, name):

print('Foo') self.name = name self.sub('greet', greet)

def pub_greet(self):

self.pub('greet', self)

foo =Foo('lai') foo.pub_greet()

python-message提供了类装饰函数observable(),任何class只需要通过它装饰一下就拥有 了 sub/unsub/pub/declare/retract等方法,它们的使用方法跟全局函数是类似的,在此不赞述。

因为python-message的消息i丁阅默认是全局性的,所以有可能产生名字冲突。 在减 少名字冲突方面,可以借鉴java/actionscript3的package起名策略, 比如在应用中定 义消息主题常量FOO=‘com.googlecode.python-message.FOO’, 这祥多个库同时定义 FOO常量也不容易冲突。除此之外,还有_招就是使用uuid,如下:

uuid = 'bd61825688ci72b345ce07057b2555719' FOO = uuid + 'F00'