1. 程式人生 > >無鎖資料結構:佇列

無鎖資料結構:佇列

佇列多種多樣,不同之處在於訊息生產者、消費的數量不同;在於是基於預先分配的buffer有界佇列,還是基於List的無界佇列;在於是否支援優先順序;在於是無鎖非阻塞,還是有鎖;在於嚴格遵守FIFO,公平還是非公平等等。更多細節參見Dmitry Vyukov的文章

眾所周知,更多特定的佇列需求,勢必需要更加有效的演算法。本文中,只考慮佇列最常見的版本,多個生產者對多個消費者,無界併發佇列,因此不考慮優先順序。

我猜佇列想必是研究人員最喜歡的資料結構,因為它簡單,但卻比棧複雜,因為它有兩端而非一端。正是因為有兩端,那麼一個有趣的問題就出來了:如何在多執行緒環境下管理它們呢?各種版本的佇列演算法紛紛發表,想要做一個全面的描述是不可能的了。我提煉其中一些最流行的演算法簡要介紹一下,首先從經典佇列開始。

經典佇列

經典佇列是一個帶有兩端,即頭和尾的列表。從頭部讀取資料,從尾部寫入資料。

一個標準的簡單佇列

C++
123456789101112131415161718192021222324252627 structNode{Node*m_pNext;};classqueue{Node*m_pHead;Node*m_pTail;public:queue():m_pHead(NULL),m_pTail(NULL){}voidenqueue(Node*p){p->m_pNext=nullptr;if(m_pTail)m_pTail->m_pNext=p;elsem_pHead=p;m_pTail=p;}Node*
dequeue(){if(!m_pHead)returnnullptr;Node*p=m_pHead;m_pHead=p->m_pNext;if(!m_pHead)m_pTail=nullptr;returnp;}};

這裡就不要過多糾結於此,它不適用於併發,列出來只是為了印證主題,說明該佇列有多簡單。本文會向大家展示,該佇列適用於併發場景時,其簡單演算法做了哪些變動。

Michael和Scott的演算法被認為是無鎖佇列的經典演算法。

以下程式碼來自libcds庫,它是經典演算法的一種簡單實現。若想檢視全部程式碼,請看cds::intrusive::MSQueue類。程式碼中包含有註釋,避免大家讀起來乏味:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 boolenqueue(value_type&val){/*         實現細節:NODE_TYPE和VALUE_TYPE-        是不一樣的,因此需要型別轉換。       為了簡單起見,我們假定node_traits :: to_node_ptr -      僅僅是的static_cast<node_type *>( &val )   */node_type *pNew=node_traits::to_node_ptr(val);typename gc::Guard guard;// A guard, for example, Hazard Pointer// Back-off strategy (of the template-argument class)back_off bkoff;node_type *t;// As always in lock-free, we’ll deal with it, till we make the right thing...while(true){/*            保護m_pTail, 在讀取該欄位時          可以規避已刪除記憶體被讀的情形          */t=guard.protect(m_pTail,node_to_value());node_type *pNext=t->m_pNext.load(memory_model::memory_order_acquire);/*            有趣的是:該演算法假定            m_pTail不能指向尾部(Tail),            而是希望通過進一步的呼叫實現對Tail正確的設定。            多執行緒互助就是一個典型的例子            */if(pNext!=nullptr){// 在接下來的執行緒之後// 有必要有效地清理Tailm_pTail.compare_exchange_weak(t,pNext,std::memory_order_release,std::memory_order_relaxed);/*                  全部必須從新開始,即使CAS不成功;                  CAS不成功的,意味著在我們讀取m_pTail之前,它已經被改變                  */continue;}node_type *tmp=nullptr;if(t->m_pNext.compare_exchange_strong(tmp,pNew,std::memory_order_release,std::memory_order_relaxed)){// Have successfully added a new element to the queue.break;}/*          執行失敗- 即CAS運算沒有成功          這意味著有人先我們到達          檢測到有併發,為了不惡化CAS          呼叫back_off函式          我們及時地做一個短時間的回退          */bkoff();}/*       通常,我們可以利用元素計數器      顯而易見,此計數器是不很準確:      元素早已新增,我們現在的才進行計數      這樣的計數器只能作為元素數量的清單,      不能作為一個空佇列符號      */++m_ItemCounter;/*      最後,試著改變m_pTail的Tail。      不用在意成功與否,      如果沒有成功,      它們會在dequeue()方法的Oops!註解前後被清理掉      */m_pTail.compare_exchange_strong(t,pNew,std::memory_order_acq_rel,std::memory_order_relaxed);/*    本演算法返回必然是true。    其它演算法,比如有界佇列,    當佇列已滿時,可以返回false    為了保證介面的一致性,enqueue()    總是返回成功的標誌    */returntrue;}value_type *dequeue(){node_type *pNext;back_off bkoff;// We need 2 Hazard Pointers for dequeue typename gc::template GuardArray<2>guards;node_type *h;// Keep trying till we can execute it…while(true){// Read and guard our Head m_pHeadh=guards.protect(0,m_pHead,node_to_value());// and the element that follows the HeadpNext=guards.protect(1,h->m_pNext,node_to_value());// Check: is what we have just read // remains bounded?..if(m_pHead.load(std::memory_order_relaxed)!=h){// no, - someone has managed to spoil everything... // Start all over againcontinue;}/*            此標記顯示佇列已空            注意與tail不同的是,            H=head始終不會錯的            */if(pNext==nullptr)returnnullptr;// the queue is empty/*            此處讀取tail,但未採用Hazard Pointer對其進行保護            我們對其指向的上下文(資料結構中欄位)不感興趣            */node_type *t=m_pTail.load(std::memory_order_acquire);if(h==t){/*                  糟糕,有頭無尾                  元素緊隨頭之後,                  並且尾指向頭。                  我想到了應該糾錯的時候了                  */m_pTail.compare_exchange_strong(t,pNext,std::memory_order_release,std::memory_order_relaxed);// After helping them, we have to start over. // Therefore, the CAS result is not important for uscontinue;}// The most important thing is to link a new Head// That is, we move down the listif(m_pHead.compare_exchange_strong(h,pNext,std::memory_order_release,std::memory_order_relaxed)){// Success! Terminate our infinite loopbreak;}/*            倘若失敗,意味著有其干預;            不干擾它們,退回去小歇片刻            */bkoff();}// Change the not very useful counter of elements, // see the comment in enqueue--m_ItemCounter;// It’s the call of 'remove the h element' functordispose_node(h);/*    /*   有趣的是,   返回[ex] Head後面的元素,   但pNext仍然在佇列中-   這是佇列的新頭部!  */*/returnpNext;}

正如大家所看到的,佇列由一個有頭有尾的單鏈表組成。

演算法的核心是什麼呢?通過常規的CAS控制兩個指標——這倆指標分別指向頭部的和尾部。實際上得到的佇列永遠不為空。檢視程式碼,是否有任何一處對頭和尾做了nullptr檢查?沒有吧。非空的佇列構造器中,新增啞元素(dummy element)給它,作為頭和尾。出隊返回一個元素,該元素作為一個新的頭啞元素,其前面的啞元素被移除。

(譯者注:所謂啞元素,僅是為了佔一個位置,讓連結串列永遠不為空,從而簡化判斷的邊緣條件,其資料部分沒有任何意義)

在設計侵入式佇列時必須考慮,返回指標是佇列的一部分,僅在下一次出隊時可以移除它。

其次,演算法假定尾部指標不指向最後一個元素。每一次讀取尾部,需檢查尾部是否包含下一個m_pNext元素。倘若該指標不為nullptr說明tail位置不對,應該後移。但這裡有另外一個陷阱:或許tail會指向head前面的元素。為了避免這一點,出隊方法中對m_pTail->m_pNext做了隱式地檢查:先讀取head,m_pHead->m_pNext元素緊隨其後,確保pNext != nullptr。接著看到head等於tail,tail後面必然還有元素,即pNext,此時應該後移tail。這是一個典型的執行緒互助案例,它在無鎖程式設計中很常見。

2000年,小範圍的演算法優化被提出