【DPDK】【ring】從DPDK的ring來看x86無鎖佇列的實現
【前言】
佇列是眾多資料結構中最常見的一種之一。曾經有人和我說過這麼一句話,叫做“程式等於資料結構+演算法”。因此在設計模組、寫程式碼時,佇列常常作為一個很常見的結構出現在模組設計中。DPDK不僅是一個加速網路IO的框架,其內部還提供眾多的功能元件,rte_ring就是DPDK內部提供的一種無鎖佇列,本篇文章將從使用的角度出發闡述DPDK的ring怎麼用?在怎麼用的角度上再來闡述ring無鎖的實現,最後將探討實現無鎖佇列的關鍵以及在不通平臺上如何實現,本文將會探討x86平臺下無鎖佇列的實現。
權當拋磚引玉,有問題請留言指正,感激不盡。
【場景】
程式等於資料結構+演算法。但是場景仍然是最重要的,因為場景取決於我們到底“用不用”某個技術或者是某個元件,亦或是某種資料結構。
做資料面的都應該見過如圖1的這種執行緒模型。
圖1.常見的資料面執行緒模型
圖1是一種常見的資料面模型,比如linux基金會的FD.IO(VPP)採用的就是這種執行緒模型,這種執行緒模型下分工明確:
- Main Thread做管理。常常使用協程驅動實現單執行緒多工(VPP內部實現了一套類似於協程的排程機制,以此來實現單執行緒多工的排程)
- fwd Thread做純轉發。通常為了效能考慮,在轉發路徑上嚴禁有記憶體拷貝和系統呼叫(但是凡事都有例外)。
那麼現在有一種需求,fwd執行緒需要將一些資訊上傳至控制面程序那麼最好的做法是什麼呢?這裡通常有很多種實現方式,但是均和本篇文章的主要討論物件無關,因此不多做討論。
其中一種常見的手段就是通過ring,還有一種場景就是DPDK的multiprocess場景,也同樣可以通過ring來講資料包分發到其他process中。如圖2這種情況
圖2.另外一種常見的場景
這種場景是典型的“僧多肉少”型,就是“processer的數量多於rx佇列數量”,那麼這種場景下注定有一些processer是無法接管網絡卡佇列的,但是我還想發揮這些processer的處理能力,怎麼辦?
那麼常見的方案就是在接管到rx佇列的processer將資料包從rx queue上收上來後,計算資料包的rss,然後將資料包“儘量均勻”的通過ring來發送到那些沒有分配到rx queue的fwd thread上。其實也不光是雲端計算的資料面場景,在很多場景下我們都需要用到佇列,因為佇列是一個再基礎不過的資料結構,因此我們拿DPDK的ring出發,最終闡述無鎖佇列的常見實現方式。
【DPDK ring 從使用出發】
我個人覺得任何一種技術,出發點肯定是“先用再分析”,說白了就是對一種技術或對某一個模組的直觀印象都不是直接分析程式碼就能得到的,都是“先跑起來,玩一下,看看情況”得到的第一印象,因此這裡還是會先從使用的角度出發,先會用再分析實現。如果有用過DPDK Ring,那麼本節可以直接跳過,直接看後面的分析章節。
DPDK的ring程式碼主要以lib的形式整合在DPDK原始碼中,具體程式碼位置為:DPDK根目錄/lib/librte_ring目錄中。以下程式碼均已DPDK 19.11版本作為參照(其他版本基本都是大同小異)。
先介紹一下主要的函式介面:
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) //建立dpdk的rte_ring void rte_ring_free(struct rte_ring *r) //釋放已經建立的dpdk的rte_ring struct rte_ring * rte_ring_lookup(const char *name) //去尋找一個已經建立好的dpdk的rte_ring static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) //此函式為內部方法,所有入隊函式都是此函式的上層封裝 static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sc, unsigned int *available) //此函式為內部方法,所有出隊函式都是此函式的上層封裝 static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函式為批量入隊函式,為多生產者安全(multi producer) static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函式為批量入隊函式,為單生產者安全(single producer) static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned int n, unsigned int *free_space) //此函式為批量入隊函式,具體安全性質取決於建立佇列時的標誌(flags) static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函式為批量出隊函式,為多消費者安全(multi consumer) static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函式為批量出隊函式,為單消費者安全(single consumer) static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available) //此函式為批量出隊函式,具體安全性質取決於建立佇列時的標誌(flags) static inline unsigned rte_ring_count(const struct rte_ring *r) //此函式用於檢視佇列中元素的數量
可以看到上述函式列表(只是代表性的一部分)基本分為三類介面
- 建立、銷燬、尋找佇列例項;
- 入隊、出隊;
- 檢視佇列狀態,例如檢視佇列是否為滿,檢視佇列元素個數等等。
圖3.出隊的操作函式
實際上如果讓我們自己設計一個佇列,基本上也逃離不出去這些介面,並且根據圖3可以看出,所有的出隊函式基本都是基於__rte_ring_do_enqueue的封裝而已。
那麼實際使用起來的步驟可以基本可以為以下流程圖描述
圖3.dpdk ring常見的使用流程
使用流程還是非常簡單的,因為佇列本身作為一個常見的資料結構使用起來並不複雜,具體使用的例子可以看dpdk的example/multiprocess/中的例子。
但是使用的時候有幾個地方需要注意:
- ring在建立時呼叫的rte_ring_create函式中最後兩個引數socket_id和flags一定要注意。socket_id這裡的socket不是unix網路程式設計中的socket,而是指的numa節點,numa架構下,如果processer訪問的記憶體和自己不在一個numa node上會產生非常嚴重的效能損耗。flags決定了這個佇列的性質,也就是是“什麼性質的安全”,例如如果指定RING_F_SP_ENQ那麼就會建立一個單生產者安全的佇列(實際上完全是扯淡,建立時的flags實際上影響的並不是佇列本身的性質而是呼叫佇列的函式__rte_ring_do_enqueue引數)
- ring在建立時呼叫的rte_ring_create函式中,大小必須是2的N次冪大小。
- ring的push或者是pop,不是對整個物件進行操作,而是對物件的記憶體進行操作,換句話說push和pop塞入/得到的其實只能是物件的記憶體地址而已,所以效能很高。(這點也符合資料面的設計原則,嚴禁記憶體拷貝,如果是拷貝整個物件那麼勢必會產生額外的記憶體拷貝,傳記憶體既不發生記憶體拷貝,效能又強,為何不這麼做呢?)
可以看到dpdk的rte_ring使用上還是蠻簡單的,因此接下來就從原始碼出發解析一下dpdk的rte_ring的無鎖實現。
【DPDK ring 的無鎖實現】
先說結論:
無鎖的實現依賴於一個彙編指令: cmpxchg 翻譯過來就是compare and change
我們先看看dpdk的ring是如何實現無鎖的,我們拿__rte_ring_do_enqueue和__rte_ring_do_dequeue這兩個函式開刀,這兩個函式分別是入隊和出隊的底層實現函式,其餘所有的入隊和出隊函式都是基於這兩個函式進行了上層封裝而已。
先想一下,在多生產者和多消費者場景下,分別要應付哪些問題?
- 多個生產者,生產位置有衝突,比如生產者A要push 3個元素,生產者B要push 3個元素,如何做到不衝突不覆蓋?
- 生產者和消費者,生產了之後要讓消費者可以消費,消費了之後要讓生產者進行生產。
- 多消費者,和多生產者的問題類似,消費位置衝突,比如消費者A要消費3個元素,消費者B要消費3個元素,如何做到消費不衝突讓每一個消費者都能有元素可以消費?
我們先看第一個問題和第二個問題是如何實現的,但是在分析實際函式的實現之前,我們要先分析一下rte_ring。
struct rte_ring { char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; // ring的名稱,lookup的時候就是根據名稱進行查詢對應的ring int flags; // 標記,用來描述佇列是單/多生產者還是單/多消費者安全 const struct rte_memzone *memzone; // 所屬的memzone,memzone是dpdk記憶體管理底層的資料結構 uint32_t size; // 佇列長,為2^n。如果flags為RING_F_EXACT_SZ // 佇列size為初始化時佇列長度的向上取2的n次冪,例如如果為 // 7,那麼向上取最近的2^n冪的數為8.如果flags不為 // RING_F_EXACT_SZ,那麼初始化佇列的時候佇列長必須為2^n冪 uint32_t mask; // 掩碼,為佇列長 - 1,用來計算位置的時候取餘用 uint32_t capacity; // 佇列容量,一般不等於佇列長度,把佇列容量理解為實際可以 // 使用的元素個數即可。例如初始化時count為7並且指定標誌為 // RING_F_EXACT_SZ,那麼count最後為8,但是capacity為7,因為 // 8是向上取2^n冪取出來的,實際上仍然是建立時所需的個數,8. char pad0 __rte_cache_aligned; // 填充,考慮到效能,要使用填充法保證cache line struct rte_ring_headtail prod __rte_cache_aligned; // 生產者位置,裡面有一個生產者頭,即prod.head,還有一個生 // 產者尾,即prod.tail。prod.head代表著下一次生產時的起始 // 生產位置。prod.tail代表消費者可以消費的位置界限,到達 // prod.tail後就無法繼續消費,通常情況下生產完成後, // prod.tail = prod.head,意味著剛生產的元素皆可以被消費 char pad1 __rte_cache_aligned; struct rte_ring_headtail cons __rte_cache_aligned; // 消費者位置,裡面有一個消費者頭,即cons.head,還有一個消 // 費者尾,即cons.tail。cons.head代表著下一次消費時的起始 // 消費位置。cons.tail代表生產者可以生產的位置界限,到達 // cons.tail後就無法繼續生產,通常情況下消費完成後, // cons.tail = cons.head,意味著剛消費的位置皆可以被生產 char pad2 __rte_cache_aligned; /**< empty cache line */ };
上述資料結構為rte_ring的資料結構,rte_ring就代表著一條ring,是ring的抽象。其中重要的是兩個地方,一個是prod,一個是cons,前者代表生產者,後者代表消費者,裡面分別有兩個標記,關於標記的用途已經在上述程式碼的註釋中闡述。
但是還有一點,ring中存放的資料在哪?dpdk的ring中存放的資料位置可以見圖4.
圖4.dpdk ring的記憶體分佈圖
可以看到,rte_ring的data中存放的是指標(就因為是指標才能利用cmpxchg實現“無鎖”),並且data分佈在struct rte_ring緊鄰的空間中(圖中青色的記憶體塊)。在分析實際的函式前,再看幾個流程圖,結合rte_ring中的資料結構來看,理解會更加深刻(當然這部分的內容在《深入淺出dpdk》一書中的4.4.2節也有描述)。
1.入隊操作,以單生產者單消費者(多生產者和多消費者基本差不多)為例。初始狀態為圖5所示。初始狀態中佇列中有4個元素,分別是obj1、obj2、obj3、obj4.
圖5.初始狀態
2.第一步,新元素入隊,先偏移prod.head到新的生產者頭位置,例如現在位置為5,若生產元素的個數為2,那麼新位置即為index = 7,但是由於涉及到多生產者,其中多生產者無鎖的奧祕就在這一步,因此先佔位置,如圖6。
圖6.入隊的第一步操作
3.第二步,元素寫入。
圖7.入隊的第二步操作
4.第三步,更新生產者的尾指標,也就是prod.tail,因為第二步只是將元素寫入而已,涉及生產-消費的流程,還要告訴消費者“可以消費”,prod.tail的作用便是如此,所以需要更新,但是假設當前消費者開始消費,那麼流程便如圖7所示,消費者的頭標記只能到達生產者尾標記的位置。
圖8.出隊的第一步操作
5.第四步,消費者開始消費元素,此時生產者的tail標記開始更新。
圖9.出隊的第二步操作
6.第五步,與生產者相同,消費者消費資料後,被消費後的空間不能立即用於生產,還需要更新tail標記才可以(cons.tail)
圖10.生產-消費後的最終狀態
接下來,理解了上述生產-消費的流程後,既可以分析具體的函數了,接下來將站在生產者的視角進行分析程式碼實現(消費者與生產者幾乎相同),拿生產者的入隊函式__rte_ring_do_enqueue來分析。
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, unsigned int is_sp, unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t free_entries; //第一步,先偏移頭指標,搶佔生產位置 n = __rte_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries); if (n == 0) goto end; //第二步,塞資料 ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); //第三部,更新尾指標,讓消費者可以消費 update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: if (free_space != NULL) *free_space = free_entries - n; return n; }
上述程式碼是一個典型的“三步走”。
- 先偏移頭指標,說白了就是搶位置。這步主要是為了對付多生產者的情況。
- 搶到位置後寫資料。
- 更新尾指標,讓消費者可以消費剛塞入的資料。
那麼很顯然,第一步就是對付第一個問題的,即在多生產者下如何讓生產者可以順利生產並且多個生產者之間不會互相沖突,所以需要分析一下__rte_ring_move_prod_head函式。
static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; unsigned int max = n; int success; do { //1.先確定生產者要生產多少個元素 n = max; //2.拿到現在生產者的head位置,也就是即將生產的位置 *old_head = r->prod.head; //記憶體屏障 rte_smp_rmb(); //3.計算剩餘的空間 *free_entries = (capacity + r->cons.tail - *old_head); //4.比較生產的元素個數和剩餘空間 if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries; if (n == 0) return 0; //5.計算生產後的新位置 *new_head = *old_head + n; if (is_sp) r->prod.head = *new_head, success = 1; else //6.如果是多生產者的話呼叫cpmset函式實現生產位置搶佔 success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; }
上述函式邏輯是一個非常簡單的實現邏輯,而關鍵在於第6點和do while迴圈,cmpset函式是什麼?又是如何實現的生產位置搶佔呢?
1 static inline int 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) 3 { 4 uint8_t res; 5 6 asm volatile( 7 MPLOCKED 8 "cmpxchgl %[src], %[dst];" 9 "sete %[res];" 10 : [res] "=a" (res), /* output */ 11 [dst] "=m" (*dst) 12 : [src] "r" (src), /* input */ 13 "a" (exp), 14 "m" (*dst) 15 : "memory"); /* no-clobber list */ 16 return res; 17 }
上述cmpset為x86體系下的實現,可以看到,是一段GCC內聯的彙編指令,這段內聯的嵌入彙編指令由三個彙編指令構成,最核心的一個指令便是第8行的“cmpxchg”,這便是我們最開始說的 "無鎖的實現依賴於cmpxchg指令",那麼這個指令究竟是什麼意思呢?
cmpxchg指令的意思就是“compare and change”,即“比較並交換”。 舉個例子,如果A等於B,則將C賦值給A;如果A不等於B,則拒絕將C賦值給A。
根據這個特徵我們可以知道,在多生產者場景下,最擔心的事情是什麼呢?最擔心的事情即為“前腳剛計算好生產位置(偏移),後腳還沒等寫入資料,結果就被另外一個生產者把剛剛計算好的生產位置給佔了,結果自己沒得空間生產”,將這個場景結合剛才的cmpxchg之後怎麼解決呢?
如果生產位置沒有變化(A等於B),那麼就將最新的生產位置(計算偏移後的生產位置)賦值給生產者指標;如果生產位置發生了變化(有其他生產者也在生產),那麼就取消更新生產者指標
核心實現就是上面這句話。關於rte_atomic32_cmpset函式,下一章【x86的cas】中會詳細講解。
那麼頭指標偏移部分程式碼的流程圖可以總結如下:
那麼至此,第一個問題之“多生產者如何解決生產位置的問題得到了解決”,那麼接下來就是第三個問題,“如何讓消費者可以消費剛剛生產的資料?”
這個問題在“三步走”中的第三部中解決的。
static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single, uint32_t enqueue) { //1.記憶體屏障 if (enqueue) rte_smp_wmb(); else rte_smp_rmb(); //2.如果有其他生產者生產資料,那麼需要等待其將資料生產完更新tail指標後,本生產者才能更新tail指標 if (!single) while (unlikely(ht->tail != old_val)) rte_pause(); //3.更新tail指標,更新的位置為最新的生產位置,意味著剛剛生產的資料已經全部可以被消費者消費 ht->tail = new_val; }
這裡面可能唯一會讓人產生些許疑惑的就是step 2.這裡有一個自旋鎖,自旋等待"ht->tail == old_val"條件的成立,這是為什麼呢?想一下這樣的場景:
單生產者單消費者情況下:生產資料成功後,應該講prod.tail指標前移至prod.head處,相當於告訴消費者佇列中的資料都是可以消費的,但是如果此時是多生產者場景,由於有多個生產者,prod.tail指標可能隨時發生變化,例如:
剛開始的時候,prod.head = prod.tail = 0,生產者A生產了3份資料,prod.head = 3並且prod.tail = 0,隨後生產者B生產了2份資料,prod.head = 5並且prod.tail = 0,那麼此時會滿足“ht->tail == old_val”麼?不會,ht->tail = prod.tail = 0,而old_val的值卻為生產元素前的prod.head的值,也就是3.那麼此時需要做的就是等待生產者A將3份資料完全生產完,並且將prod.tail更新至3,那麼此時才會滿足“ht->tail == old_val”。說白了就是得等別的生產者完全生產完才能生產。但是從最終結果而言,生產者A生產了3個元素,生產者B生產了2個元素,最終結果中,prod.tail = 5,也就是剛剛生產的5個元素可以全部被消費者消費。
所以從上面的“__rte_ring_do_enqueue”函式可以看出,想想所謂的無鎖佇列真的實現了理想的“無鎖”麼?
“rte_ring_do_dequeue”的函式執行流程與“__rte_ring_do_enqueue”的流程基本一致,無法後者為生產者視角,而前者為消費者視角,請讀者根據上述“佇列入隊”的分析過程自行分析“隊列出隊”。
【x86的CAS】
可能有的讀者在“無鎖”這個概念上知道“無鎖”的實現是一種"CAS"操作,那麼什麼才是CAS操作呢?
CAS的全程為“Compare And Swap”,意味比較並交換
“比較並交換”,這個概念和前一章中“cmpxchg”指令的含義基本一致。核心思想就是:
和預期結果比較,相同則賦值,不同則放棄
如果和預期不同,那麼我會一遍一遍的去嘗試,當沒有人和我競爭了,和預期結果自然就會“相同”,再回到之前的內聯彙編。
1 static inline int 2 rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src) 3 { 4 uint8_t res; 5 6 asm volatile( 7 MPLOCKED 8 "cmpxchgl %[src], %[dst];" 9 "sete %[res];" 10 : [res] "=a" (res), /* output */ 11 [dst] "=m" (*dst) 12 : [src] "r" (src), /* input */ 13 "a" (exp), 14 "m" (*dst) 15 : "memory"); /* no-clobber list */ 16 return res; 17 }
想讀懂這個函式首先需要先了解內聯彙編的正確寫法和格式。當然,接下來要說的內聯彙編格式為intel格式。由於涉及到內聯彙編的文章有許多,在這裡不會詳細介紹內聯彙編的格式和寫法,更多的會聚焦於此函式的實現。
內聯彙編的函式格式為:
1 asm ( assembler template 2 : output operands /* optional */ 3 : input operands /* optional */ 4 : list of clobbered registers /* optional */ 5 );
很簡單,內聯彙編由4個部分組成:
- assembler template。也就是彙編的指令集合。對應到rte_atomic32_cmpset函式中就是line 7、8、9三行的內容。
- 輸出運算元,也稱為目的運算元,不懂運算元是什麼的可以將它理解為C語言的左值,也就是輸出被賦值的變數,等號左邊的。對應到rte_atomic32_cmpset函式中就是line 10、11的內容。
- 輸入運算元,也稱為源運算元,不懂運算元是什麼的可以將它理解為C語言的右值,也就是輸入賦值的變數,等號右邊的。對應到rte_atomic32_cmpset函式中就是line 12、13、14的內容。
- 被改變的暫存器的值,這個地方看場合,不同的場合不太一樣。對應到rte_atomic32_cmpset函式中就是line 15的內容,也就是記憶體屏障。
- 還有一點需要注意的是,opt-code %1,%2,其中在intel架構下,前者為目的運算元,也就是%1,後者為源運算元%2。
- 還有一個額外的概念就是constraints,也就是約束。對應到rte_atomic32_cmpset函式中就是運算元前面的雙引號部分,例如“a” (exp),這裡雙引號裡面的a就是一個constraints。約束分為很多種,這裡只介紹常見的幾種:
- "a"是一個暫存器約束。用來指定“eax”暫存器,被描述的物件會將值存至eax暫存器;
- "="不算是一個constraints,而是作為一個修飾符,相當於告訴這個元素是“write-only”;
- "r"同樣是一個暫存器約束,用來表明是通用暫存器,被修飾的運算元會被存到通用暫存器中,沒有具體指定的話就是任意;
- “m”是一個記憶體約束,和暫存器約束的區別是,暫存器約束會將值取到暫存器中,參與完計算後會回寫到記憶體中,而記憶體約束就不需要暫存器作為中轉,全程在記憶體中進行,所以速度也會慢於暫存器。
那麼我們接著回到rte_atomic32_cmpset函式的實現,line 7是一個x86架構下的“lock”指令指令字首,注意“lock”其實本質上不是一個指令,而是一個指令字首,也就是用來修飾接下來的指令,支隊接下來的指令有效力,並且修飾的指令必須是對記憶體有“讀-改-寫”三種操作的指令,就比如說cmpxchg指令就是。
#if RTE_MAX_LCORE == 1 #define MPLOCKED /**< No need to insert MP lock prefix. */ #else #define MPLOCKED "lock ; " /**< Insert MP lock prefix. */ #endif
在x86多核架構下,lock指令通常用來確保多核訪問cache line是具有排他性的(相當於一把鎖)。
第一個指令是cmpxchg,關於cmpxchg我們前面已經大致講過此命令的作用。此命令的實際作用是:
比較源運算元和eax暫存器中的值,如果相同,則將目的運算元更新為源運算元,並且將標誌暫存器中的ZF(zero flags)位置1;如果源運算元和eax暫存器中的值不通,則將源運算元寫入eax暫存器中,並將標誌暫存器中的ZF(zero flags)清0
那麼對照上面的場景,一般eax暫存器中存的值都是初始值,也就是還沒有計算入隊偏移的初始值,由於在計算入隊偏移操作時,其他生產者可能也在進行計算入隊偏移,那麼就會起衝突,具體體現就是生產者頭指標發生變化,因此在cmpxchg指令中,再拿生產者頭指標和初始值進行比較,如果相同這說明現在沒有其他生產者在更新,那麼源運算元(當前生產者頭指標)和eax暫存器中的值(事先備份的初始值)必定相同,此時則可以安全的將目的運算元賦值至源運算元,也就是(prod.head = new_head);如果不同,這說明現在可能有其他生產者在生產導致生產者頭指標發生變化(prod.head發生變化),那麼此時便不能更新源運算元(prod.head)。
第二個指令是sete,這個指令就很簡單了,就是單純的將標誌暫存器中的zf位的值賦值給目的運算元,也就是res。那就意味著如果cmpxchg執行交換成功,則zf位為1,那麼經過sete設定後,res返回值也就是1;如果cmpxchg執行交換失敗,則zf為0,那麼經過sete設定後,res的返回值也就是0.
那麼這個函式便是,如果cmpxchg成功,則函式返回1,如果cmpxchg失敗,則函式返回0,那麼根據函式的返回值,上層邏輯便知道更新生產者頭指標是否成功,成功直接返回即可;不成功怎麼辦呢?也很簡單,迴圈,我一次一次試(while迴圈),總會成功的。
可以看到,CAS操作實現無鎖的本質上就是“比較”,比較什麼呢?這取決於我們最擔心什麼?那我們最擔心的是什麼呢?我們最擔心的無非就是
生產者的視角:我剛開始根據舊的生產者頭指標 + 生產的元素數量,計算出生產後的指標位置,結果在我計算的過程中,由於有其他生產者干擾,導致舊的生產者頭指標已經發生了變化,那麼計算出的生產後的指標位置也是失效的。 消費者的視角:我剛開始根據舊的消費者頭指標 + 消費的元素數量,計算出消費後的指標位置,結果在我計算的過程中,由於有其他消費者干擾,導致舊的消費者頭指標已經發生了變化,那麼計算出的消費後的指標位置也是失效的。
所以需要比較什麼呢?比較是“預期值”與“實際值”,預期值是我們希望“舊的生產者頭指標不會發生變化”,那麼實際值便是“當前的生產者頭指標位置”,那麼我只需要比較兩者便可以得知,是否有其他生產者干擾,只有符合預期的情況,我才可以進行接下來的操作,也就是賦值。
【後續】
- CAS操作真的做到了理想的“無鎖”了麼?我個人的想法是並沒有做到,因為那段彙編指令仍然是同時只有一個執行者(processer)通過,只不過是和傳統的mutex、spinlock之類的鎖,critical section更小而已。
- CAS操作的本質是“比較預期值和實際值”,ARM平臺下也有類似的CAS操作,只不過ARM平臺下叫做LL/SC,本質上與CAS相同,都是比較,以後有精力會將ARM平臺的LL/SC也補上。
- 我本人是沒想到一個dpdk ring的分析會寫將近1周...雖然自己明白,但是實際寫成文件,輸出部落格,還是得花上一番心思的。