ZeroMQ介面函式之 :zmq_socket_monitor
阿新 • • 發佈:2018-12-30
1 #include <stdio.h>
2 #include <zmq.h>
3 #include <pthread.h>
4 #include <string.h>
5 #include <assert.h>
6
7 static int read_msg(void* s, zmq_event_t* event, char* ep)
8 {
9 int rc ;
10 zmq_msg_t msg1; // binary part
11 zmq_msg_init (&msg1);
12 zmq_msg_t msg2; // address part
13 zmq_msg_init (&msg2);
14 rc = zmq_msg_recv (&msg1, s, 0);
15 if (rc == -1 && zmq_errno() == ETERM)
16 return 1 ;
17 assert (rc != -1);
18 assert (zmq_msg_more(&msg1) != 0);
19 rc = zmq_msg_recv (&msg2, s, 0 );
20 if (rc == -1 && zmq_errno() == ETERM)
21 return 1;
22 assert (rc != -1);
23 assert (zmq_msg_more(&msg2) == 0);
24 // copy binary data to event struct
25 const char* data = (char*)zmq_msg_data(&msg1);
26 memcpy(&(event->event), data, sizeof (event->event));
27 memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value));
28 // copy address part
29 const size_t len = zmq_msg_size(&msg2) ;
30 ep = memcpy(ep, zmq_msg_data(&msg2), len);
31 *(ep + len) = 0 ;
32 return 0 ;
33 }
34
35 // REP socket monitor thread
36 static void *rep_socket_monitor (void *ctx)
37 {
38 zmq_event_t event;
39 static char addr[1025] ;
40 int rc;
41 printf("starting monitor...\n");
42 void *s = zmq_socket (ctx, ZMQ_PAIR);
43 assert (s);
44 rc = zmq_connect (s, "inproc://monitor.rep");
45 assert (rc == 0);
46 while (!read_msg(s, &event, addr)) {
47 switch (event.event) {
48 case ZMQ_EVENT_LISTENING:
49 printf ("listening socket descriptor %d\n", event.value);
50 printf ("listening socket address %s\n", addr);
51 break;
52 case ZMQ_EVENT_ACCEPTED:
53 printf ("accepted socket descriptor %d\n", event.value);
54 printf ("accepted socket address %s\n", addr);
55 break;
56 case ZMQ_EVENT_CLOSE_FAILED:
57 printf ("socket close failure error code %d\n", event.value);
58 printf ("socket address %s\n", addr);
59 break;
60 case ZMQ_EVENT_CLOSED:
61 printf ("closed socket descriptor %d\n", event.value);
62 printf ("closed socket address %s\n", addr);
63 break;
64 case ZMQ_EVENT_DISCONNECTED:
65 printf ("disconnected socket descriptor %d\n", event.value);
66 printf ("disconnected socket address %s\n", addr);
67 break;
68 }
69 }
70 zmq_close (s);
71 return NULL;
72 }
73
74 int main()
75 {
76 const char* addr = "tcp://127.0.0.1:6666" ;
77 pthread_t thread ;
78 // Create the infrastructure
79 void *ctx = zmq_init (1);
80 assert (ctx);
81 // REP socket
82 void* rep = zmq_socket (ctx, ZMQ_REP);
83 assert (rep);
84 // REP socket monitor, all events
85 int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
86 assert (rc == 0);
87 rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx);
88 assert (rc == 0);
89 rc = zmq_bind (rep, addr);
90 assert (rc == 0);
91 // Allow some time for event detection
92 zmq_sleep (1);
93 // Close the REP socket
94 rc = zmq_close (rep);
95 assert (rc == 0);
96 zmq_term (ctx);
97 return 0 ;
98 }