linux c++ 迴圈訊息佇列寫法
阿新 • • 發佈:2019-02-04
很多場合,我們需要一個迴圈訊息處理模式,一個執行緒產生訊息,一個執行緒進行處理。產生訊息的執行緒就不用阻塞了,可以用了幹其他的事情了。常見串列埠或者網路通訊,負責解析位元組流的執行緒將訊息初步解析然後放置到一個訊息佇列裡面,處理執行緒負責迴圈取出訊息佇列裡面的訊息進行相應的動作。下面是一在liunx下一個基本實現框架:
typedef struct Message { char * msg; char * res; Message(void) { msg = NULL; res = NULL; } void Init(const char *m,const char * r) { if(m && strlen(m)>0) { msg = (char *)malloc(strlen(m)+1); if(msg) { memset(msg,0,strlen(m)+1); strcpy(msg,m); } } else { msg = NULL; } if(r && strlen(r)>0) { res = (char *)malloc(strlen(r)+1); if(res) { memset(res,0,strlen(r)+1); strcpy(res,r); } } else { res = NULL; } } void Free() { if(msg) free(msg); if(res) free(res); } }Message; //訊息佇列 typedef struct Msg_list { pthread_mutex_t lock; pthread_cond_t cond; std::list<Message> msg_list; Msg_list() { pthread_mutex_init(&lock,NULL); pthread_cond_init(&cond,NULL); } void push(Message m) { pthread_mutex_lock(&lock); msg_list.push_back(m); pthread_mutex_unlock(&lock); pthread_cond_signal(&cond); } Message pop() { int get =0 ; Message message; do{ if(!msg_list.empty()) { message = msg_list.front(); msg_list.pop_front(); get = 1; } else { pthread_mutex_lock(&lock); pthread_cond_wait(&cond,&lock);//進入該函式會釋放lock,等待cond收到訊號後,會鎖定lock pthread_mutex_unlock(&lock); } }while(!get); return message; } }Msg_list; Msg_list g_messages; //消費者執行緒 void * msg_handle_thr(void * arg) { Message m; while(1) { m = g_messages.pop(); if(m.msg && m.res) { printf("msg:[%s]\n",m.msg); #if DEMO_360 int type = -1; json_object *msg_obj = NULL; const char *file = NULL; const char *url = NULL; //這裡解析msg訊息體,以下解析方式只是我們的demo示例。 msg_obj = json_tokener_parse(m.msg); if(is_error(msg_obj)) { printf("msg is invalid\n"); goto con; } type = json_helper_get_int(msg_obj, "type",-1); if(2 == type) { file = json_helper_get_str(msg_obj,"video_name"); if(file) { //這裡就是將res原封不動的返回給SDK Qihoo_IOTS_upload_file(file,m.res); } } else if(1 == type) { url = json_helper_get_str(msg_obj,"url"); if(url) { //這裡就是將res原封不動的返回給SDK if(Qihoo_IOTS_download_file("/tmp/12345.mp4",url,m.res) == 0) { printf("send response\n"); //這裡就是將res原封不動的返回給SDK Qihoo_IOTS_send_response("ok",m.res); } } } else { printf("User message type:%d\n",type); } con: if(msg_obj != NULL && !is_error(msg_obj)) json_helper_put_safe(msg_obj); #endif } m.Free(); } return NULL; } //生產者執行緒 void recive_msg(const char* msg,const char*res) { /*注意:該介面中只是將訊息插入list,並未處理, *因為該介面不能夠阻塞,需要立即返回。 *msg為實際收到的訊息體, *res為一些介面的依賴資訊,應用中並不需要解析,只是暫存,然後再塞給相應的介面。 * */ Message message; message.Init(msg,res); g_messages.push(message); return; }