libev+nanomsg實現多執行緒通訊及事件輪詢例項demo
阿新 • • 發佈:2018-12-21
概述:
在我們剛接觸程式編碼的時候,我們要輪詢資料有沒有發過來,我們最多的可能還是使用while+sleep這樣的組合,這對於處理來說是一個效率很低的方法同時還消耗cpu,那麼在多執行緒程式設計中使用libev+nanomsg會不會提高效率呢。下面例項主要工作如下:A B C 三個執行緒通過nanomsg通訊,A執行緒作為主執行緒,控制中樞,B C請求均通過A.那麼在實際應用中,比如B模組是接收客戶請求並解析傳送控制命令的,C模組是負責幹活的,A是總控,所有控制命令通過A下發到別的模組,這樣維護都很方便。那看下怎麼實現的吧。
例項Demo:
#include <stdio.h> #include <stdint.h> #include <unistd.h> #include <string.h> #include <time.h> #include <ev.h> #include <pthread.h> #include <nanomsg/pair.h> #include <nanomsg/nn.h> /*************************************************************************************************** 動態庫:libev nanomsg 概述:A B C 三個執行緒通過nanomsg通訊,A執行緒作為主執行緒,控制中樞,B C請求均通過A. demo示範: A為指令處理模組 B為指令接收模組 C為指令執行模組 B -> A 開燈 A -> C 開燈 C : 執行開燈 C -> A OK A -> B OK 總結: 這只是簡單的測試使用例子,你可以通過在這個框架的基礎上做更多的功能,對於多執行緒程式設計這將是一個不 錯的選擇. ***************************************************************************************************/ typedef struct { int n; //nanomsg socket int s; //nanomsg recieve fd }nanomsg_info_t; typedef struct { nanomsg_info_t ab; nanomsg_info_t ac; }Aloop_ctrl_t; typedef struct { nanomsg_info_t ba; }Bloop_ctrl_t; typedef struct { nanomsg_info_t ca; }Cloop_ctrl_t; /*獲取系統時間列印*/ uint32_t print_timenow() { time_t now; struct tm *tm_now; time(&now); tm_now = localtime(&now); uint32_t times = tm_now->tm_hour * 3600 + tm_now->tm_min * 60 + tm_now->tm_sec; printf("[%02d:%02d:%02d]\r\n", tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec); return times; } /*****************************************子執行緒C相關**********************************************/ static void watcher_c_cb (struct ev_loop *loop ,struct ev_io *w, int revents) { void *user_data = ev_userdata(loop); Cloop_ctrl_t *Cloop_ctrl = (Cloop_ctrl_t *)user_data; uint8_t *dat = NULL; uint32_t bytes = nn_recv(Cloop_ctrl->ca.n, &dat, NN_MSG, NN_DONTWAIT); if (bytes <= 0) { return; } printf("C:%s (A->C)\r\n", (char *)dat); nn_freemsg(dat); //接收成功,傳送OK char *str = "OK"; uint8_t *udata = nn_allocmsg(3, 0); if (NULL != udata) { memcpy(udata, str, 3); nn_send(Cloop_ctrl->ca.n, &udata, NN_MSG, NN_DONTWAIT); } } int C_nanomsg_init(Cloop_ctrl_t *Cloop_ctrl) { Cloop_ctrl->ca.n = nn_socket(AF_SP, NN_PAIR); if (Cloop_ctrl->ca.n < 0) { return -1; } if (nn_connect(Cloop_ctrl->ca.n, "inproc://c2a_loop") < 0) { return -1; } size_t size = sizeof(size_t); if (nn_getsockopt(Cloop_ctrl->ca.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Cloop_ctrl->ca.s, &size) < 0) { return -1; } return 0; } struct ev_loop* C_loop_init(Cloop_ctrl_t *Cloop_ctrl) { static struct ev_io watcher_c; struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL); if (NULL == loop) { printf("create C loop failed\r\n"); return NULL; } ev_io_init (&watcher_c, watcher_c_cb, Cloop_ctrl->ca.s, EV_READ); ev_io_start (loop, &watcher_c); return loop; } void *C_thread(void *arg) { Cloop_ctrl_t Cloop_ctrl; if (C_nanomsg_init(&Cloop_ctrl) < 0) { printf("nanomsg init failed\r\n"); return ; } struct ev_loop* Cloop = C_loop_init(&Cloop_ctrl); if (NULL == Cloop) { printf("Cloop init failed\r\n"); return ; } ev_set_userdata(Cloop, &Cloop_ctrl); ev_run (Cloop, 0); return ; } /*****************************************子執行緒B相關**********************************************/ static void watcher_b_cb (struct ev_loop *loop ,struct ev_io *w, int revents) { void *user_data = ev_userdata(loop); Bloop_ctrl_t *Bloop_ctrl = (Bloop_ctrl_t *)user_data; uint8_t *dat = NULL; uint32_t bytes = nn_recv(Bloop_ctrl->ba.n, &dat, NN_MSG, NN_DONTWAIT); if (bytes <= 0) { return; } printf("B:%s (A->B)\r\n\r\n", (char *)dat); nn_freemsg(dat); } static void watcher_timer_cb (struct ev_loop *loop ,struct ev_timer *w, int revents) { static int i = 1; char send_data[128] = {0}; void *user_data = ev_userdata(loop); Bloop_ctrl_t *Bloop_ctrl = (Bloop_ctrl_t *)user_data; sprintf(send_data, "Please turn on LED[%d]", i); i ++; int length = strlen(send_data) + 1; uint8_t *udata = nn_allocmsg(length, 0); if (NULL != udata) { memcpy(udata, send_data, length); nn_send(Bloop_ctrl->ba.n, &udata, NN_MSG, NN_DONTWAIT); } //如果定時器不重設,就會預設1秒進入一次回撥 w->repeat = 10; ev_timer_again(loop, w); } int B_nanomsg_init(Bloop_ctrl_t *Bloop_ctrl) { Bloop_ctrl->ba.n = nn_socket(AF_SP, NN_PAIR); if (Bloop_ctrl->ba.n < 0) { return -1; } if (nn_connect(Bloop_ctrl->ba.n, "inproc://b2a_loop") < 0) { return -1; } size_t size = sizeof(size_t); if (nn_getsockopt(Bloop_ctrl->ba.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Bloop_ctrl->ba.s, &size) < 0) { return -1; } return 0; } struct ev_loop* B_loop_init(Bloop_ctrl_t *Bloop_ctrl) { static struct ev_io watcher_b; static struct ev_timer watcher_timer; struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL); if (NULL == loop) { printf("create loop failed\r\n"); return NULL; } ev_io_init (&watcher_b, watcher_b_cb, Bloop_ctrl->ba.s, EV_READ); ev_timer_init(&watcher_timer, watcher_timer_cb, 5, 1); ev_io_start (loop, &watcher_b); ev_timer_start (loop, &watcher_timer); return loop; } void *B_thread(void *arg) { Bloop_ctrl_t Bloop_ctrl; if (B_nanomsg_init(&Bloop_ctrl) < 0) { printf("nanomsg init failed\r\n"); return ; } struct ev_loop* Bloop = B_loop_init(&Bloop_ctrl); if (NULL == Bloop) { printf("Bloop init failed\r\n"); return ; } ev_set_userdata(Bloop, &Bloop_ctrl); ev_run (Bloop, 0); return ; } /*****************************************主執行緒A相關**********************************************/ static void watcher_ab_cb (struct ev_loop *loop ,struct ev_io *w, int revents) { void *user_data = ev_userdata(loop); Aloop_ctrl_t *Aloop_ctrl = (Aloop_ctrl_t *)user_data; uint8_t *dat = NULL; uint32_t bytes = nn_recv(Aloop_ctrl->ab.n, &dat, NN_MSG, NN_DONTWAIT); if (bytes <= 0) { return; } //轉發到C printf("A:%s (B->A)\r\n", (char *)dat); nn_send(Aloop_ctrl->ac.n, &dat, NN_MSG, NN_DONTWAIT); } static void watcher_ac_cb (struct ev_loop *loop ,struct ev_io *w, int revents) { void *user_data = ev_userdata(loop); Aloop_ctrl_t *Aloop_ctrl = (Aloop_ctrl_t *)user_data; uint8_t *dat = NULL; uint32_t bytes = nn_recv(Aloop_ctrl->ac.n, &dat, NN_MSG, NN_DONTWAIT); if (bytes <= 0) { return; } //轉發到B printf("A:%s (C->A)\r\n", (char *)dat); nn_send(Aloop_ctrl->ab.n, &dat, NN_MSG, NN_DONTWAIT); } /*主事件nanomsg初始化*/ int A_nanomsg_init(Aloop_ctrl_t *Aloop_ctrl) { //ab通訊的nanomsg初始化 Aloop_ctrl->ab.n = nn_socket(AF_SP, NN_PAIR); if (Aloop_ctrl->ab.n < 0) { return -1; } if (nn_bind(Aloop_ctrl->ab.n, "inproc://b2a_loop") < 0) { return -1; } //獲取此埠的接收資料fd描述符 size_t size = sizeof(size_t); if (nn_getsockopt(Aloop_ctrl->ab.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Aloop_ctrl->ab.s, &size) < 0) { return -1; } //ac通訊的nanomsg初始化 Aloop_ctrl->ac.n = nn_socket(AF_SP, NN_PAIR); if (Aloop_ctrl->ac.n < 0) { return -1; } if (nn_bind(Aloop_ctrl->ac.n, "inproc://c2a_loop") < 0) { return -1; } //獲取此埠的接收資料fd描述符 if (nn_getsockopt(Aloop_ctrl->ac.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&Aloop_ctrl->ac.s, &size) < 0) { return -1; } return 0; } /*主事件迴圈初始化*/ struct ev_loop* A_loop_init(Aloop_ctrl_t *Aloop_ctrl) { static struct ev_io watcher_ab, watcher_ac; struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL); if (NULL == loop) { printf("create loop failed\r\n"); return NULL; } //傳參 ev_set_userdata(loop, Aloop_ctrl); //初始化 ev_io_init (&watcher_ab, watcher_ab_cb, Aloop_ctrl->ab.s, EV_READ); ev_io_init (&watcher_ac, watcher_ac_cb, Aloop_ctrl->ac.s, EV_READ); ev_io_start (loop, &watcher_ab); ev_io_start (loop, &watcher_ac); return loop; } /**************************************************************************************************/ int main() { pthread_t pb,pc; Aloop_ctrl_t Aloop_ctrl; if (A_nanomsg_init(&Aloop_ctrl) < 0) { printf("nanomsg init failed\r\n"); return -1; } struct ev_loop* Aloop = A_loop_init(&Aloop_ctrl); if (NULL == Aloop) { printf("Aloop init failed\r\n"); return -1; } //建立執行緒B if (0 != pthread_create(&pb, NULL, B_thread, NULL)) { printf("create pthread B failed\r\n"); return -1; } //建立執行緒C if (0 != pthread_create(&pc, NULL, C_thread, NULL)) { printf("create pthread C failed\r\n"); return -1; } //執行 ev_run(Aloop, 0); return 0; }
編譯執行:
//編譯
gcc -o ev_nanomsg ev_nanomsg.c -lev -lnanomsg -lpthread
//執行結果
A:Please turn on LED[1] (B->A)
C:Please turn on LED[1] (A->C)
A:OK (C->A)
B:OK (A->B)