訊息佇列-zmq常用通訊模式
阿新 • • 發佈:2019-02-05
zmq是一個訊息佇列。可以在程序內、程序間、TCP、多播中,以訊息為單位傳輸資料,而不是socket的位元組流。官方主頁上有下載、使用、文件,蠻全的。
常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。
Request-Reply
request
zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); //Send the request zmq::message_t request(6); memcpy ((void *)request.data(), "Hello", 5); socket.send(request); //Get the reply zmq::message_t reply; socket.recv(&reply);
server
Publish-Subscribezmq::context_t context (1); zmq::socket_t socket(context, ZMQ_REP); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep (1); // Send reply back to client zmq::message_t reply (5); memcpy ((void *) reply.data (), "World", 5); socket.send (reply); }
publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
publisher.send(message);
subscriber
Parallel Pipelinezmq::context_t context (1); zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); const char *filter = ""; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); zmq::message_t update; subscriber.recv(&update);
ventilator
zmq::context_t context (1);
// Socket to send messages on
zmq::socket_t sender(context, ZMQ_PUSH);
sender.bind("tcp://*:5557");
// 通知sink開始處理任務
zmq::socket_t sink(context, ZMQ_PUSH);
sink.connect("tcp://localhost:5558");
zmq::message_t message(2);
memcpy(message.data(), "0", 1);
sink.send(message);
//開始往pipeline傳送資料
message.rebuild(10);
sprintf ((char *) message.data(), "%d", workload);
sender.send(message);
worker
zmq::context_t context(1);
// Socket to receive messages on
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
zmq::socket_t sender(context, ZMQ_PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (1) {
receiver.recv(&message);
// Send results to sink
message.rebuild();
sender.send(message);
}
sink
// Prepare our context and socket
zmq::context_t context(1);
zmq::socket_t receiver(context,ZMQ_PULL);
receiver.bind("tcp://*:5558");
// Wait for start of batch
zmq::message_t message;
receiver.recv(&message);
//receive from worker
receiver.recv(&message);