1. 程式人生 > 程式設計 >詳解Python 實現 ZeroMQ 的三種基本工作模式

詳解Python 實現 ZeroMQ 的三種基本工作模式

簡介

引用官方說法:ZMQ(以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 程式設計更加簡單、簡潔和效能更高。

是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。

ZMQ 的明確目標是“成為標準網路協議棧的一部分,之後進入 Linux 核心”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統” BSD 套接字之上的一 層封裝。ZMQ 讓編寫高效能網路應用程式極為簡單和有趣。

它跟 RabbitMQ,ActiveMQ 之類有著相當本質的區別,ZeroMQ 根本就不是一個訊息佇列伺服器,更像是一組底層網路通訊庫,對原有的 Socket API 加上一層封裝,使我們操作更簡便。

三種工作模式

Request-Reply 模式:

說到“請求-應答”模式,不得不說的就是它的訊息流動模型。訊息流動模型指的是該模式下,必須嚴格遵守“一問一答”的方式。

發出訊息後,若沒有收到回覆,再發出第二條訊息時就會丟擲異常。同樣的,對於 Rep 也是,在沒有接收到訊息前,不允許發出訊息。

基於此構成“一問一答”的響應模式。

server:

# -*- coding=utf-8 -*-

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
 message = socket.recv()
 print("Received: %s" % message)
 socket.send("I am OK!")

client:

# -*- coding=utf-8 -*-

import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send('Are you OK?')
response = socket.recv()
print("response: %s" % response)

Publish-Subscribe 模式:

“釋出-訂閱”模式下,“釋出者”繫結一個指定的地址,例如“192.168.10.1:5500”,“訂閱者”連線到該地址。該模式下訊息流是單向的,只允許從“釋出者”流向“訂閱者”。且“釋出者”只管發訊息,不理會是否存在“訂閱者”。一個“釋出者”可以擁有多個訂閱者,同樣的,一個“訂閱者”也可訂閱多個釋出者。

雖然我們知道“釋出者”在傳送訊息時是不關心“訂閱者”的存在於否,所以先啟動“釋出者”,再啟動“訂閱者”是很容易導致部分訊息丟失的。那麼可能會提出一個說法“我先啟動‘訂閱者',再啟動‘釋出者',就能解決這個問題了?”

對於 ZeroMQ 而言,這種做法也並不能保證 100% 的可靠性。在 ZeroMQ 領域中,有一個叫做“慢木匠”的術語,就是說即使我是先啟動了“訂閱者”,再啟動“釋出者”,“訂閱者”總是會丟失第一批資料。因為在“訂閱者”與端點建立 TCP 連線時,會包含幾毫秒的握手時間,雖然時間短,但是是存在的。再加上 ZeroMQ 後臺 IO 是以一部方式執行的,所以若不在雙方之間施加同步策略,訊息丟失是不可避免的。

關於“釋出-訂閱”模式在 ZeroMQ 中的一些其他特點:

  • 公平排隊,一個“訂閱者”連線到多個釋出者時,會均衡的從每個“釋出者”讀取訊息,不會出現一個“釋出者”淹沒其他“釋出者”的情況。
  • ZMQ3.0 以上的版本,過濾規則發生在“釋出方”。 ZMQ3.0 以下的版本,過濾規則發生在“訂閱方”。其實也就是處理訊息的位置。

server:

# -*- coding=utf-8 -*-

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

for i in range(10):
 print('send message...' + str(i))
 socket.send('message' + str(i))
 time.sleep(1)

client:

# -*- coding=utf-8 -*-

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'')
while True:
 response = socket.recv()
 print("response: %s" % response)

Parallel Pipeline 模式:

在說明“管道模式”前,需要明確的是在 ZeroMQ 中並沒有絕對的服務端與客戶端之分,所有的資料接收與傳送都是以連線為單位的,只區分 ZeroMQ 定義的型別。就像套接字繫結地址時,可以使用 bind ,也可以使用 connect ,只是通常我們將理解中的服務端 bind 到一個地址,而理解中的客戶端 connec 到該地址。

“管道模式”一般用於任務分發與結果收集,由一個任務發生器來產生任務,“公平”的派發到其管轄下的所有 worker,完成後再由結果收集器來回收任務的執行結果。

整體流程比較好理解,worker 連線到任務發生器上,等待任務的產生,完成後將結果傳送至結果收集器。如果要以客戶端服務端的概念來區分,這裡的任務發生器與結果收集器是服務端,而 worker 是客戶端。

前面說到了這裡任務的派發是“公平的”,因為內部採用了 LRU 的演算法來找到最近最久未工作的閒置 worker。但是公平在這裡是相對的,當任務發生器啟動後,第一個連線到它的 worker 會在一瞬間承受整個任務發生器產生的 tasks。

總結來說由三部分組成,push 進行資料推送,work 進行資料快取,pull 進行資料競爭獲取處理。區別於 Publish-Subscribe 存在一個數據快取和處理負載。

當連線被斷開,資料不會丟失,重連後資料繼續傳送到對端。

server:

# -*- coding=utf-8 -*-

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

for i in range(10):
 socket.send('message' + str(i))
 # 沒啟 worker 時不會發訊息
 print('send message...' + str(i))
 time.sleep(1)

work:

# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')

sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
 data = receive.recv()
 print('transform...' + data)
 sender.send(data)

client:

# -*- coding=utf-8 -*-
import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
 response = socket.recv()
 print("response: %s" % response)

以上。

參考文件:

https://www.jb51.net/article/177043.htm

總結

到此這篇關於詳解Python 實現 ZeroMQ 的三種基本工作模式的文章就介紹到這了,更多相關python ZeroMQ工作模式內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!