無鎖資料結構:佇列
佇列多種多樣,不同之處在於訊息生產者、消費的數量不同;在於是基於預先分配的buffer有界佇列,還是基於List的無界佇列;在於是否支援優先順序;在於是無鎖非阻塞,還是有鎖;在於嚴格遵守FIFO,公平還是非公平等等。更多細節參見Dmitry Vyukov的文章。
眾所周知,更多特定的佇列需求,勢必需要更加有效的演算法。本文中,只考慮佇列最常見的版本,多個生產者對多個消費者,無界併發佇列,因此不考慮優先順序。
我猜佇列想必是研究人員最喜歡的資料結構,因為它簡單,但卻比棧複雜,因為它有兩端而非一端。正是因為有兩端,那麼一個有趣的問題就出來了:如何在多執行緒環境下管理它們呢?各種版本的佇列演算法紛紛發表,想要做一個全面的描述是不可能的了。我提煉其中一些最流行的演算法簡要介紹一下,首先從經典佇列開始。
經典佇列
經典佇列是一個帶有兩端,即頭和尾的列表。從頭部讀取資料,從尾部寫入資料。
一個標準的簡單佇列
C++123456789101112131415161718192021222324252627 | structNode{Node*m_pNext;};classqueue{Node*m_pHead;Node*m_pTail;public:queue():m_pHead(NULL),m_pTail(NULL){}voidenqueue(Node*p){p->m_pNext=nullptr;if(m_pTail)m_pTail->m_pNext=p;elsem_pHead=p;m_pTail=p;}Node* |
這裡就不要過多糾結於此,它不適用於併發,列出來只是為了印證主題,說明該佇列有多簡單。本文會向大家展示,該佇列適用於併發場景時,其簡單演算法做了哪些變動。
Michael和Scott的演算法被認為是無鎖佇列的經典演算法。
以下程式碼來自libcds庫,它是經典演算法的一種簡單實現。若想檢視全部程式碼,請看cds::intrusive::MSQueue類。程式碼中包含有註釋,避免大家讀起來乏味:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 | boolenqueue(value_type&val){/* 實現細節:NODE_TYPE和VALUE_TYPE- 是不一樣的,因此需要型別轉換。 為了簡單起見,我們假定node_traits :: to_node_ptr - 僅僅是的static_cast<node_type *>( &val ) */node_type *pNew=node_traits::to_node_ptr(val);typename gc::Guard guard;// A guard, for example, Hazard Pointer// Back-off strategy (of the template-argument class)back_off bkoff;node_type *t;// As always in lock-free, we’ll deal with it, till we make the right thing...while(true){/* 保護m_pTail, 在讀取該欄位時 可以規避已刪除記憶體被讀的情形 */t=guard.protect(m_pTail,node_to_value());node_type *pNext=t->m_pNext.load(memory_model::memory_order_acquire);/* 有趣的是:該演算法假定 m_pTail不能指向尾部(Tail), 而是希望通過進一步的呼叫實現對Tail正確的設定。 多執行緒互助就是一個典型的例子 */if(pNext!=nullptr){// 在接下來的執行緒之後// 有必要有效地清理Tailm_pTail.compare_exchange_weak(t,pNext,std::memory_order_release,std::memory_order_relaxed);/* 全部必須從新開始,即使CAS不成功; CAS不成功的,意味著在我們讀取m_pTail之前,它已經被改變 */continue;}node_type *tmp=nullptr;if(t->m_pNext.compare_exchange_strong(tmp,pNew,std::memory_order_release,std::memory_order_relaxed)){// Have successfully added a new element to the queue.break;}/* 執行失敗- 即CAS運算沒有成功 這意味著有人先我們到達 檢測到有併發,為了不惡化CAS 呼叫back_off函式 我們及時地做一個短時間的回退 */bkoff();}/* 通常,我們可以利用元素計數器 顯而易見,此計數器是不很準確: 元素早已新增,我們現在的才進行計數 這樣的計數器只能作為元素數量的清單, 不能作為一個空佇列符號 */++m_ItemCounter;/* 最後,試著改變m_pTail的Tail。 不用在意成功與否, 如果沒有成功, 它們會在dequeue()方法的Oops!註解前後被清理掉 */m_pTail.compare_exchange_strong(t,pNew,std::memory_order_acq_rel,std::memory_order_relaxed);/* 本演算法返回必然是true。 其它演算法,比如有界佇列, 當佇列已滿時,可以返回false 為了保證介面的一致性,enqueue() 總是返回成功的標誌 */returntrue;}value_type *dequeue(){node_type *pNext;back_off bkoff;// We need 2 Hazard Pointers for dequeue typename gc::template GuardArray<2>guards;node_type *h;// Keep trying till we can execute it…while(true){// Read and guard our Head m_pHeadh=guards.protect(0,m_pHead,node_to_value());// and the element that follows the HeadpNext=guards.protect(1,h->m_pNext,node_to_value());// Check: is what we have just read // remains bounded?..if(m_pHead.load(std::memory_order_relaxed)!=h){// no, - someone has managed to spoil everything... // Start all over againcontinue;}/* 此標記顯示佇列已空 注意與tail不同的是, H=head始終不會錯的 */if(pNext==nullptr)returnnullptr;// the queue is empty/* 此處讀取tail,但未採用Hazard Pointer對其進行保護 我們對其指向的上下文(資料結構中欄位)不感興趣 */node_type *t=m_pTail.load(std::memory_order_acquire);if(h==t){/* 糟糕,有頭無尾 元素緊隨頭之後, 並且尾指向頭。 我想到了應該糾錯的時候了 */m_pTail.compare_exchange_strong(t,pNext,std::memory_order_release,std::memory_order_relaxed);// After helping them, we have to start over. // Therefore, the CAS result is not important for uscontinue;}// The most important thing is to link a new Head// That is, we move down the listif(m_pHead.compare_exchange_strong(h,pNext,std::memory_order_release,std::memory_order_relaxed)){// Success! Terminate our infinite loopbreak;}/* 倘若失敗,意味著有其干預; 不干擾它們,退回去小歇片刻 */bkoff();}// Change the not very useful counter of elements, // see the comment in enqueue--m_ItemCounter;// It’s the call of 'remove the h element' functordispose_node(h);/* /* 有趣的是, 返回[ex] Head後面的元素, 但pNext仍然在佇列中- 這是佇列的新頭部! */*/returnpNext;} |
正如大家所看到的,佇列由一個有頭有尾的單鏈表組成。
演算法的核心是什麼呢?通過常規的CAS控制兩個指標——這倆指標分別指向頭部的和尾部。實際上得到的佇列永遠不為空。檢視程式碼,是否有任何一處對頭和尾做了nullptr檢查?沒有吧。非空的佇列構造器中,新增啞元素(dummy element)給它,作為頭和尾。出隊返回一個元素,該元素作為一個新的頭啞元素,其前面的啞元素被移除。
(譯者注:所謂啞元素,僅是為了佔一個位置,讓連結串列永遠不為空,從而簡化判斷的邊緣條件,其資料部分沒有任何意義)
在設計侵入式佇列時必須考慮,返回指標是佇列的一部分,僅在下一次出隊時可以移除它。
其次,演算法假定尾部指標不指向最後一個元素。每一次讀取尾部,需檢查尾部是否包含下一個m_pNext元素。倘若該指標不為nullptr,說明tail位置不對,應該後移。但這裡有另外一個陷阱:或許tail會指向head前面的元素。為了避免這一點,出隊方法中對m_pTail->m_pNext做了隱式地檢查:先讀取head,m_pHead->m_pNext元素緊隨其後,確保pNext != nullptr。接著看到head等於tail,tail後面必然還有元素,即pNext,此時應該後移tail。這是一個典型的執行緒互助案例,它在無鎖程式設計中很常見。
2000年,小範圍的演算法優化被提出。