synchronized/wait/notify 與 mutex/cond wait wake ~ 連結串列佇列 生產消費問題
阿新 • • 發佈:2018-12-24
使用條件變數 + 互斥區
http://blogread.cn/it/article/7248?f=catetitle
http://baike.baidu.com/link?url=mFxsi1w7pYQI3p-C175_u14hB0fCbYFr4JqPlNpfEZEbn4l1wZLuHuLgsrc__rvA815BnG99hyUoYgq1SGsw5a
類比: c++ pthread_mutex_[un]lock(obj) 與 java synchronize(obj)
c++ pthread_cond_wait(cond, obj) 與 java wait(obj)
#include<pthread.h> #include<unistd.h> #include<stdio.h> #include<string.h> #include<stdlib.h> static pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond=PTHREAD_COND_INITIALIZER; struct node { int n_number; struct node *n_next; } *head=NULL; /*[thread_func]*/ /*釋放節點記憶體*/ static void cleanup_handler(void*arg) { printf("Clean up handler of second thread.\n"); free(arg); (void)pthread_mutex_unlock(&mtx); } static void *thread_func(void *arg) { struct node*p=NULL; pthread_cleanup_push(cleanup_handler,p); pthread_mutex_lock(&mtx); //這個mutex_lock主要是用來保護wait等待臨界時期的情況, //當在wait為放入佇列時,這時,已經存在Head條件等待啟用 //的條件,此時可能會漏掉這種處理 //這個while要特別說明一下,單個pthread_cond_wait功能很完善, //為何這裡要有一個while(head==NULL)呢?因為pthread_cond_wait //裡的執行緒可能會被意外喚醒,如果這個時候head==NULL, //則不是我們想要的情況。這個時候, //應該讓執行緒繼續進入pthread_cond_wait while(1) { while(head==NULL) { pthread_cond_wait(&cond,&mtx); } //pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx, //然後阻塞在等待佇列裡休眠,直到再次被喚醒 //(大多數情況下是等待的條件成立而被喚醒,喚醒後, //該程序會先鎖定先pthread_mutex_lock(&mtx);, //再讀取資源用這個流程是比較清楚的 /*block-->unlock-->wait()return-->lock*/ p=head; head=head->n_next; printf("Got%dfromfrontofqueue\n",p->n_number); free(p); } pthread_mutex_unlock(&mtx);//臨界區資料操作完畢,釋放互斥鎖 pthread_cleanup_pop(0); return 0; } int main(void) { pthread_t tid; int i; struct node *p; pthread_create(&tid,NULL,thread_func,NULL); //子執行緒會一直等待資源,類似生產者和消費者, //但是這裡的消費者可以是多個消費者, //而不僅僅支援普通的單個消費者,這個模型雖然簡單, //但是很強大 for(i=0;i<10;i++) { p=(struct node*)malloc(sizeof(struct node)); p->n_number=i; pthread_mutex_lock(&mtx);//需要操作head這個臨界資源,先加鎖, p->n_next=head; head=p; pthread_cond_signal(&cond); pthread_mutex_unlock(&mtx);//解鎖 sleep(1); } printf("thread1wannaendthecancelthread2.\n"); pthread_cancel(tid); //關於pthread_cancel,有一點額外的說明,它是從外部終止子執行緒, //子執行緒會在最近的取消點,退出執行緒,而在我們的程式碼裡,最近的 //取消點肯定就是pthread_cond_wait()了。 pthread_join(tid,NULL); printf("Alldone--exiting\n"); return 0; }
附上c++版synchronize/wait/notify條件變數實現(cond)
http://blogread.cn/it/article/7248?f=catetitleclass NormalCond : public Cond { public: NormalCond() { pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&_cond, NULL); } ~NormalCond() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } void lock() { pthread_mutex_lock(&_mutex); } void unlock() { pthread_mutex_unlock(&_mutex); } void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); } void wake() { pthread_cond_signal(&_cond); } private: pthread_mutex_t _mutex; pthread_cond_t _cond; }; class LayeredCond : public Cond { public: LayeredCond(size_t layers = 1) : _value(0), _layers(layers) { pthread_mutex_init(&_mutex, NULL); if (_layers > sizeof(int)*8) { printf("FATAL: cannot support such layer %u (max %u)\n", _layers, sizeof(int)*8); abort(); } _waiters = new size_t[_layers]; memset(_waiters, 0, sizeof(size_t)*_layers); } ~LayeredCond() { pthread_mutex_destroy(&_mutex); delete _waiters; _waiters = NULL; } void lock() { pthread_mutex_lock(&_mutex); } void unlock() { pthread_mutex_unlock(&_mutex); } void wait(size_t layer) {//cond wait if (layer >= _layers) { printf("FATAL: layer overflow (%u/%u)\n", layer, _layers); abort(); } _waiters[layer]++; //record waiter threads on condition "_value" while (_value == 0) { int value = _value; unlock(); syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,//suspend and wait for cond wake NULL, NULL, layer2mask(layer)); lock(); //when waked, try to get lock again } _waiters[layer]--; _value--; } void wake() { int mask = ~0; lock(); for (size_t i = 0; i < _layers; i++) { if (_waiters[i] > 0) { mask = layer2mask(i); break; } } _value++; unlock(); syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1, NULL, NULL, mask); } private: int layer2mask(size_t layer) { return 1 << layer; } private: pthread_mutex_t _mutex; int _value; size_t* _waiters; size_t _layers; }; template<class T> class Stack { public: Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) { _buf = new T*[_size]; _cond = (cond_layers > 0) ? (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond(); } ~Stack() { delete []_buf; delete _cond; } T* pop(size_t layer = 0) { T* ret = NULL; _cond->lock(); do { if (_sp > 0) { ret = _buf[--_sp]; } else { _cond->wait(layer); } } while (ret == NULL); _cond->unlock(); return ret; } void push(T* obj) { _cond->lock(); if (_sp >= _size) { printf("FATAL: stack overflow\n"); abort(); } _buf[_sp++] = obj; _cond->unlock(); _cond->wake(); } private: const size_t _size; size_t _sp; T** _buf; Cond* _cond; };