W3Cschool
恭喜您成為首批注冊用戶
獲得88經(jīng)驗值獎勵
You have a program based on communicating threads and want them to implementpublish/subscribe messaging.
To implement publish/subscribe messaging, you typically introduce a separate “ex‐change” or “gateway” object that acts as an intermediary for all messages. That is, insteadof directly sending a message from one task to another, a message is sent to the exchangeand it delivers it to one or more attached tasks. Here is one example of a very simpleexchange implementation:
from collections import defaultdict
class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)
return _exchanges[name]
An exchange is really nothing more than an object that keeps a set of active subscribersand provides methods for attaching, detaching, and sending messages. Each exchangeis identified by a name, and the get_exchange() function simply returns the Exchange instance associated with a given name.Here is a simple example that shows how to use an exchange:
class Task:
...def send(self, msg):
...
task_a = Task()task_b = Task()
Although there are many different variations on this theme, the overall idea is the same.Messages will be delivered to an exchange and the exchange will deliver them to attachedsubscribers.
The concept of tasks or threads sending messages to one another (often via queues) iseasy to implement and quite popular. However, the benefits of using a public/subscribe(pub/sub) model instead are often overlooked.First, the use of an exchange can simplify much of the plumbing involved in setting upcommunicating threads. Instead of trying to wire threads together across multiple pro‐gram modules, you only worry about connecting them to a known exchange. In somesense, this is similar to how the logging library works. In practice, it can make it easierto decouple various tasks in the program.Second, the ability of the exchange to broadcast messages to multiple subscribers opensup new communication patterns. For example, you could implement systems with re‐dundant tasks, broadcasting, or fan-out. You could also build debugging and diagnostictools that attach themselves to exchanges as ordinary subscribers. For example, here isa simple diagnostic class that would display sent messages:
class DisplayMessages:def init(self):self.count = 0def send(self, msg):self.count += 1print(‘msg[{}]: {!r}'.format(self.count, msg))
exc = get_exchange(‘name')d = DisplayMessages()exc.attach(d)
Last, but not least, a notable aspect of the implementation is that it works with a varietyof task-like objects. For example, the receivers of a message could be actors (as describedin Recipe 12.10), coroutines, network connections, or just about anything that imple‐ments a proper send() method.One potentially problematic aspect of an exchange concerns the proper attachment anddetachment of subscribers. In order to properly manage resources, every subscriber thatattaches must eventually detach. This leads to a programming model similar to this:
exc = get_exchange(‘name')exc.attach(some_task)try:
...
finally:exc.detach(some_task)
In some sense, this is similar to the usage of files, locks, and similar objects. Experiencehas shown that it is quite easy to forget the final detach() step. To simplify this, youmight consider the use of the context-management protocol. For example, adding asubscribe() method to the exchange like this:
from contextlib import contextmanagerfrom collections import defaultdict
class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)
@contextmanagerdef subscribe(self, *tasks):
for task in tasks:self.attach(task)try:yieldfinally:for task in tasks:self.detach(task)
def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)
return _exchanges[name]
...exc.send(‘msg1')exc.send(‘msg2')...
Finally, it should be noted that there are numerous possible extensions to the exchangeidea. For example, exchanges could implement an entire collection of message channels
or apply pattern matching rules to exchange names. Exchanges can also be extendedinto distributed computing applications (e.g., routing messages to tasks on differentmachines, etc.).
Copyright©2021 w3cschool編程獅|閩ICP備15016281號-3|閩公網(wǎng)安備35020302033924號
違法和不良信息舉報電話:173-0602-2364|舉報郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號
聯(lián)系方式:
更多建議: