redis的釋出訂閱模式pubsub
阿新 • • 發佈:2022-05-02
前言
redis支援釋出訂閱模式,在這個實現中,傳送者(傳送資訊的客戶端)不是將資訊直接傳送給特定的接收者(接收資訊的客戶端),而是將資訊傳送給頻道(channel),然後由頻道將資訊轉發給所有對這個頻道感興趣的訂閱者。
傳送者無須知道任何關於訂閱者的資訊,而訂閱者也無須知道是那個客戶端給它傳送資訊,它只要關注自己感興趣的頻道即可。
對釋出者和訂閱者進行解構(decoupling),可以極大地提高系統的擴充套件性(scalability),並得到一個更動態的網路拓撲(network topology)。
redis 釋出訂閱主要由三個entity組成:channel/subscriber/publisher。
channel:
頻道有兩種型別
明確型,news.sport,體育類新聞
模糊型,news.*,各種新聞
下面實現對於這兩種是透明的。
# -*- coding:utf-8 -*- class Channel(object): def __init__(self, channel=''): self.channel = channel def __str__(self): return self.channel class ChannelFactory(object): def __init__(self, *channels): if isinstance(channels[0], (tuple, list)): self.channel_list = [Channel(channel) for channel in channels[0]] self.channel_list = [Channel(channel) for channel in channels] def get_channels(self): return self.channel_list
user:
主要有兩類,訂閱者subscriber和釋出者publisher,他們都繼承自Pubsub,由繼承關係實現:
# -*- coding:utf-8 -*- import redis class Pubsub(object): def __init__(self, redis_config): pool = redis.ConnectionPool( host=redis_config.get('host'), port=redis_config.get('port'), db=redis_config.get('db') ) self.redis = redis.StrictRedis(connection_pool=pool) class Subscriber(Pubsub): def __init__(self, redis_config): Pubsub.__init__(self, redis_config=redis_config) self.pubsub = self.redis.pubsub() def subscribe(self, *channel): self.pubsub.subscribe(*channel) def psubscribe(self, *channel_pattern): self.pubsub.psubscribe(*channel_pattern) def listen(self): for item in self.pubsub.listen(): yield item def unsubscribe(self, *channel): self.pubsub.unsubscribe(*channel) def punsubscribe(self, *channel_pattern): self.pubsub.unsubscribe(*channel_pattern) class Publisher(Pubsub): def __init__(self, redis_config): Pubsub.__init__(self, redis_config=redis_config) def publish(self, channel, message): self.redis.publish(channel, message)
測試
分兩部分,訂閱程序和釋出程序
訂閱程序:
from config import redis as redis_config
from subscriber import Subscriber
from channel import ChannelFactory
if __name__ == '__main__':
channel = ChannelFactory('news.*', 'movie.*').get_channels()
sub = Subscriber(redis_config)
sub.psubscribe(channel)
for item in sub.listen():
print item
釋出程序:
from config import redis as redis_config
from publisher import Publisher
from channel import Channel
if __name__ == '__main__':
channel = Channel('news.abc')
pub = Publisher(redis_config)
pub.publish(channel, 'aaaaaaaa')