分散式進階 十五 ZMQ
我們為什麼需要ZMQ
目前的應用程式很多都會包含跨網路的元件,無論是區域網還是因特網。這些程式的開發者都會用到某種訊息通訊機制。有些人會使用某種訊息佇列產品,而大多數人則會自己手工來做這些事,使用TCP或UDP協議。這些協議使用起來並不困難,但是,簡單地將訊息從A發給B,和在任何情況下都能進行可靠的訊息傳輸,這兩種情況顯然是不同的。
讓我們看看在使用純TCP協議進行訊息傳輸時會遇到的一些典型問題。任何可複用的訊息傳輸層肯定或多或少地會要解決以下問題:
如何處理I/O?是讓程式阻塞等待響應,還是在後臺處理這些事?這是軟體設計的關鍵因素。阻塞式的I/O操作會讓程式架構難以擴充套件,而後臺處理
如何處理那些臨時的、來去自由的元件?我們是否要將元件分為客戶端和服務端兩種,並要求服務端永不消失?那如果我們想要將服務端相連怎麼辦?我們要每隔幾秒就進行重連嗎?
我們如何表示一條訊息?我們怎樣通過拆分訊息,讓其變得易讀易寫,不用擔心快取溢位,既能高效地傳輸小訊息,又能勝任視訊等大型檔案的傳輸?
如何處理那些不能立刻傳送出去的訊息?比如我們需要等待一個網路元件重新連線的時候?我們是直接丟棄該條訊息,還是將它存入資料庫,或是記憶體中的一個佇列?
要在哪裡儲存訊息佇列?如果某個元件讀取訊息佇列的速度很慢,造成訊息的堆積怎麼辦?我們要採取什麼樣的策略?
如何處理丟失的訊息?我們是等待新的資料,請求重發,還是需要建立一套新的可靠性機制以保證訊息不會丟失?如果這個機制自身崩潰了呢?
如果我們想換一種網路連線協議,如用廣播代替TCP單播?或者改用IPv6?我們是否需要重寫所有的應用程式,或者將這種協議抽象到一個單獨的層中?
我們如何對訊息進行路由?我們可以將訊息同時傳送給多個節點嗎?是否能將應答訊息返回給請求的傳送方?
我們如何為另一種語言寫一個API?我們是否需要完全重寫某項協議,還是重新打包一個類庫?
怎樣才能做到在不同的架構之間傳送訊息?是否需要為訊息規定一種編碼?
我們如何處理網路通訊錯誤?等待並重試,還是直接忽略或取消?
ZMQ就是這樣一種軟體:它高效,提供了嵌入式的類庫,使應用程式能夠很好地在網路中擴充套件,成本低廉。
ZMQ的主要特點有:
ZMQ會在後臺執行緒非同步地處理I/O操作,它使用一種不會死鎖的資料結構來儲存訊息。
網路元件可以來去自如,ZMQ會負責自動重連,這就意味著你可以以任何順序啟動元件;用它建立的面向服務架構(SOA)中,服務端可以隨意地加入或退出網路。
ZMQ會在有必要的情況下自動將訊息放入佇列中儲存,一旦建立了連線就開始傳送。
ZMQ有閾值(HWM)的機制,可以避免訊息溢位。當佇列已滿,ZMQ會自動阻塞傳送者,或丟棄部分訊息,這些行為取決於你所使用的訊息模式。
ZMQ可以讓你用不同的通訊協議進行連線,如TCP、廣播、程序內、程序間。改變通訊協議時你不需要去修改程式碼。
ZMQ會恰當地處理速度較慢的節點,會根據訊息模式使用不同的策略。
ZMQ提供了多種模式進行訊息路由,如請求-應答模式、釋出-訂閱模式等。這些模式可以用來搭建網路拓撲結構。
ZMQ中可以根據訊息模式建立起一些中間裝置(很小巧),可以用來降低網路的複雜程度。
ZMQ會發送整個訊息,使用訊息幀的機制來傳遞。如果你傳送了10KB大小的訊息,你就會收到10KB大小的訊息。
ZMQ不強制使用某種訊息格式,訊息可以是0位元組的,或是大到GB級的資料。當你表示這些訊息時,可以選用諸如谷歌的protocol buffers,XDR等序列化產品。
ZMQ能夠智慧地處理網路錯誤,有時它會進行重試,有時會告知你某項操作發生了錯誤。
ZMQ甚至可以降低對環境的汙染,因為節省了CPU時間意味著節省了電能。
其實ZMQ可以做的還不止這些,它會顛覆人們編寫網路應用程式的模式。雖然從表面上看,它不過是提供了一套處理套接字的API,能夠用zmq_recv()和zmq_send()進行訊息的收發,但是,訊息處理將成為應用程式的核心部分,很快你的程式就會變成一個個訊息處理模組,這既美觀又自然。它的擴充套件性還很強,每項任務由一個節點(節點是一個執行緒)、同一臺機器上的兩個節點(節點是一個程序)、同一網路上的兩臺機器(節點是一臺機器)來處理,而不需要改動應用程式。
一、ZeroMQ的背景介紹
官方:“ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket程式設計更加簡單、簡潔和效能更高。是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網路協議棧的一部分,之後進入Linux核心”。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的“傳統”BSD套接字之上的一層封裝。ZMQ讓編寫高效能網路應用程式極為簡單和有趣。”
與其他訊息中介軟體相比,ZMQ並不像是一個傳統意義上的訊息佇列伺服器,事實上,它也根本不是一個伺服器,它更像是一個底層的網路通訊庫,在Socket API之上做了一層封裝,將網路通訊、程序通訊和執行緒通訊抽象為統一的API介面。
二、ZMQ是什麼
閱讀了ZMQ的Guide文件後,我的理解是,這是個類似於Socket的一系列介面,他跟Socket的區別是:普通的socket是端到端的(1:1的關係),而ZMQ卻是可以N:M 的關係,人們對BSD套接字的瞭解較多的是點對點的連線,點對點連線需要顯式地建立連線、銷燬連線、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ遮蔽了這些細節,讓你的網路程式設計更為簡單。ZMQ用於node與node間的通訊,node可以是主機或者是程序。
三、三種模型
參考網址:http://blog.csdn.net/whuqin/article/details/8442919/
a) 應答模式:
使用REQ-REP套接字傳送和接受訊息是需要遵循一定規律的。客戶端首先使用zmq_send()傳送訊息,再用zmq_recv()接收,如此迴圈。如果打亂了這個順序(如連續傳送兩次)則會報錯。類似地,服務端必須先進行接收,後進行傳送。
b) 訂閱釋出模式
PUB-SUB套接字組合是非同步的。客戶端在一個迴圈體中使用recv ()接收訊息,如果向SUB套接字傳送訊息則會報錯;類似地,服務端可以不斷地使用send ()傳送訊息,但不能再PUB套接字上使用recv ()。
關於PUB-SUB套接字,還有一點需要注意:你無法得知SUB是何時開始接收訊息的。就算你先打開了SUB套接字,後開啟PUB傳送訊息,這時SUB還是會丟失一些訊息的,因為建立連線是需要一些時間的。很少,但並不是零。解決此問題需要在PUB端加入sleep。
c) 基於分散式處理(管道模式)
這篇部落格對ZMQ有一個初步的介紹,下篇部落格介紹如何通過JAVA來呼叫ZMQ實現訊息處理。
下面貼出PUB_SUB(應答模式)模式下的程式碼:
釋出端:
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope publisher
*/
public class psenvpub {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
while (!Thread.currentThread ().isInterrupted ()) {
// Write two messages, each with an envelope and content
publisher.sendMore ("A");
publisher.send ("We don't want to see this");
publisher.sendMore ("B");
publisher.send("We would like to see this");
}
publisher.close ();
context.term ();
}
}
釋出端需要通過context.socket(ZMQ.PUB)表示為釋出端,通過bind方法來建立釋出端連線,等待訂閱者連線。
之後通過send方法將資料傳送到出去。
之後來寫訂閱端程式碼
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope subscriber
*/
public class psenvsub {
public static void main (String[] args) {
// Prepare our context and subscriber
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread ().isInterrupted ()) {
// Read envelope with address
String address = subscriber.recvStr ();
// Read message contents
String contents = subscriber.recvStr ();
System.out.println(address + " : " + contents);
}
subscriber.close ();
context.term ();
}
}
客戶端通過connect進行連線,之後通過recv來進行資料接收。
下面貼出REQ_REP(訂閱釋出模式)模式下的程式碼:
傳送端:
package cn.edu.ujn.req_rep;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
import org.zeromq.ZMQ;
public class hwserver {
private static int i = 0;
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received " + new String(request) + i++);
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}
接收端:
package cn.edu.ujn.req_rep;
//
// Hello World client in Java
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
//
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}
下面貼出Para_Pipe(基於分散式處理(管道模式))模式下的程式碼:
傳送端:
package cn.edu.ujn.para_pipe;
import java.util.Random;
import org.zeromq.ZMQ;
//
// Task ventilator in Java
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
public class taskvent {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
// Socket to send messages on
ZMQ.Socket sink = context.socket(ZMQ.PUSH);
sink.connect("tcp://localhost:5558");
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
sink.send("0", 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = srandom.nextInt(100) + 1;
total_msec += workload;
System.out.print(workload + ".");
String string = String.format("%d", workload);
sender.send(string, 0);
}
System.out.println("Total expected cost: " + total_msec + " msec");
Thread.sleep(1000); // Give 0MQ time to deliver
sink.close();
sender.close();
context.term();
}
}
中介端:
package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
//
// Task worker in Java
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
public class taskwork {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to receive messages on
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (!Thread.currentThread ().isInterrupted ()) {
String string = new String(receiver.recv(0)).trim();
long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + '.');
// Do the work
Thread.sleep(msec);
// Send results to sink
sender.send("".getBytes(), 0);
}
sender.close();
receiver.close();
context.term();
}
}
接收端:
package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
//
// Task sink in Java
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
public class tasksink {
public static void main (String[] args) throws Exception {
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.bind("tcp://*:5558");
// Wait for start of batch
String string = new String(receiver.recv(0));
// Start our clock now
long tstart = System.currentTimeMillis();
// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
string = new String(receiver.recv(0)).trim();
if ((task_nbr / 10) * 10 == task_nbr) {
System.out.print(":");
} else {
System.out.print(".");
}
}
// Calculate and report duration of batch
long tend = System.currentTimeMillis();
System.out.println("\nTotal elapsed time: " + (tend - tstart) + " msec");
receiver.close();
context.term();
}
}
到此為止釋出訊息的三種模式就寫完了,希望通過這篇部落格讀者能夠對ZMQ有初步的認識和簡單實用,希望這篇部落格對學習zmq的讀者有所幫助。
正確地使用上下文
ZMQ應用程式的一開始總是會先建立一個上下文,並用它來建立套接字。在C語言中,建立上下文的函式是zmq_init()。一個程序中只應該建立一個上下文。從技術的角度來說,上下文是一個容器,包含了該程序下所有的套接字,併為inproc協議提供實現,用以高速連線程序內不同的執行緒。如果一個程序中建立了兩個上下文,那就相當於啟動了兩個ZMQ例項。如果這正是你需要的,那沒有問題,但一般情況下:
在一個程序中使用zmq_init()函式建立一個上下文,並在結束時使用zmq_term()函式關閉它。
如果你使用了fork()系統呼叫,那每個程序需要自己的上下文物件。如果在呼叫fork()之前呼叫了zmq_init()函式,那每個子程序都會有自己的上下文物件。通常情況下,你會需要在子程序中做些有趣的事,而讓父程序來管理它們。
正確地退出和清理
程式設計師的一個良好習慣是:總是在結束時進行清理工作。當你使用像Python那樣的語言編寫ZMQ應用程式時,系統會自動幫你完成清理。但如果使用的是C語言,那就需要小心地處理了,否則可能發生記憶體洩露、應用程式不穩定等問題。
記憶體洩露只是問題之一,其實ZMQ是很在意程式的退出方式的。箇中原因比較複雜,但簡單的來說,如果仍有套接字處於開啟狀態,呼叫zmq_term()時會導致程式掛起;就算關閉了所有的套接字,如果仍有訊息處於待發送狀態,zmq_term()也會造成程式的等待。只有當套接字的LINGER選項設為0時才能避免。
我們需要關注的ZMQ物件包括:訊息、套接字、上下文。好在內容並不多,至少在一般的應用程式中是這樣:
處理完訊息後,記得用zmq_msg_close()函式關閉訊息;
如果你同時開啟或關閉了很多套接字,那可能需要重新規劃一下程式的結構了;
退出程式時,應該先關閉所有的套接字,最後呼叫zmq_term()函式,銷燬上下文物件。
警告:你的想法可能會被顛覆!
傳統網路程式設計的一個規則是套接字只能和一個節點建立連線。雖然也有廣播的協議,但畢竟是第三方的。當我們認定“一個套接字 = 一個連線”的時候,我們會用一些特定的方式來擴充套件應用程式架構:我們為每一塊邏輯建立執行緒,該執行緒獨立地維護一個套接字。
但在ZMQ的世界裡,套接字是智慧的、多執行緒的,能夠自動地維護一組完整的連線。你無法看到它們,甚至不能直接操縱這些連線。當你進行訊息的收發、輪詢等操作時,只能和ZMQ套接字打交道,而不是連線本身。所以說,ZMQ世界裡的連線是私有的,不對外部開放,這也是ZMQ易於擴充套件的原因之一。
由於你的程式碼只會和某個套接字進行通訊,這樣就可以處理任意多個連線,使用任意一種網路協議。而ZMQ的訊息模式又可以進行更為廉價和便捷的擴充套件。
這樣一來,傳統的思維就無法在ZMQ的世界裡應用了。在你閱讀示例程式程式碼的時候,也許你腦子裡會想方設法地將這些程式碼和傳統的網路程式設計相關聯:當你讀到“套接字”的時候,會認為它就表示與另一個節點的連線——這種想法是錯誤的;當你讀到“執行緒”時,會認為它是與另一個節點的連線——這也是錯誤的。
如果你是第一次閱讀本指南,使用ZMQ進行了一兩天的開發(或者更長),可能會覺得疑惑,ZMQ怎麼會讓事情便得如此簡單。你再次嘗試用以往的思維去理解ZMQ,但又無功而返。最後,你會被ZMQ的理念所折服,撥雲見霧,開始享受ZMQ帶來的樂趣。
若朋友們有疑問,可留言,以求共進!
再分享一下我老師大神的人工智慧教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智慧的隊伍中來!https://www.cnblogs.com/captainbed