ZeroMQ“釋出/訂閱”模型的C++程式碼
阿新 • • 發佈:2018-12-12
ZeroMQ環境的搭建就不說了,之前已經說過。
來看ZeroMQ的“釋出/訂閱”模型的C++程式碼:
pub.cpp程式碼為:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include "zmq.h" int main() { void* context = zmq_ctx_new(); assert(context != NULL); void* publisher = zmq_socket(context, ZMQ_PUB); assert(publisher != NULL); int ret = zmq_bind(publisher, "tcp://*:5555"); assert(ret == 0); int i = 0; while(1) { char szBuf[1024] = {0}; snprintf(szBuf, sizeof(szBuf), "server i=%d", i); ret = zmq_send(publisher, szBuf, strlen(szBuf) + 1, 0); i++; sleep(1); } zmq_close (publisher); zmq_ctx_destroy (context); return 0; }
sub.cpp程式碼為:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include "zmq.h" int main() { printf("Hello world!\n"); void* context = zmq_ctx_new(); assert(context != NULL); void* subscriber = zmq_socket(context, ZMQ_SUB); assert(subscriber != NULL); int ret = zmq_connect(subscriber, "tcp://localhost:5555"); assert(ret == 0); ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0); assert(ret == 0); while(1) { printf("into while\n"); char szBuf[1024] = {0}; ret = zmq_recv(subscriber, szBuf, sizeof(szBuf) - 1, 0); if (ret > 0) { printf("%s\n", szBuf); } } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
先啟動釋出者的話, 訂閱者會錯過部分訊息,無法挽回。
來看看實際效果(開啟了1個釋出者和2個訂閱者):
[email protected]:~/taoge/zmq/test$ ./server
[email protected]:~/taoge/zmq/test$ ./client
Hello world!
into while
server i=21
into while
server i=22
into while
server i=23
[email protected]:~/taoge/zmq/test$ ./client Hello world! into while server i=57 into while server i=58 into while server i=59
OK, 不多說。