1. 程式人生 > >ZeroMQ“釋出/訂閱”模型的C++程式碼

ZeroMQ“釋出/訂閱”模型的C++程式碼

     ZeroMQ環境的搭建就不說了,之前已經說過。

     來看ZeroMQ的“釋出/訂閱”模型的C++程式碼:

     pub.cpp程式碼為:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include "zmq.h"

int main()
{
    void* context = zmq_ctx_new();
    assert(context != NULL);

    void* publisher = zmq_socket(context, ZMQ_PUB);
    assert(publisher != NULL);

    int ret = zmq_bind(publisher, "tcp://*:5555");
    assert(ret == 0);

    int i = 0;
    while(1)
    {
        char szBuf[1024] = {0};
        snprintf(szBuf, sizeof(szBuf), "server i=%d", i);
        ret = zmq_send(publisher, szBuf, strlen(szBuf) + 1, 0);
        i++;

        sleep(1);
    }

    zmq_close (publisher);
    zmq_ctx_destroy (context);

    return 0;
}

        sub.cpp程式碼為:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include "zmq.h"

int main()
{
    printf("Hello world!\n");

    void* context = zmq_ctx_new();
    assert(context != NULL);

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    assert(subscriber != NULL);

    int ret = zmq_connect(subscriber, "tcp://localhost:5555");
    assert(ret == 0);

    ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(ret == 0);

    while(1)
    {
        printf("into while\n");
        char szBuf[1024] = {0};
        ret = zmq_recv(subscriber, szBuf, sizeof(szBuf) - 1, 0);
        if (ret > 0)
        {
            printf("%s\n", szBuf);
        }
    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return 0;
}

        先啟動釋出者的話, 訂閱者會錯過部分訊息,無法挽回。

        來看看實際效果(開啟了1個釋出者和2個訂閱者):

[email protected]:~/taoge/zmq/test$ ./server 
[email protected]:~/taoge/zmq/test$ ./client 
Hello world!
into while
server i=21
into while
server i=22
into while
server i=23
[email protected]:~/taoge/zmq/test$ ./client 
Hello world!
into while
server i=57
into while
server i=58
into while
server i=59

        OK, 不多說。