1. 程式人生 > >C++11的6種記憶體序總結__std::memory_order_acquire_等

C++11的6種記憶體序總結__std::memory_order_acquire_等

對於C++11的6種併發查了不少相關資料,這裡作一個總結和理解std::memory_order_relaxed,std::memory_order_consume,std::memory_order_acquire
std::memory_order_release,std::memory_order_acq_rel,std::memory_order_seq_cst

粗淺理解(瞭解大概)

編譯器優化而產生的指令亂序,cpu指令流水線也會產生指令亂序,總體來講,編譯器優化層面會產生的指令亂序,cpu層面也會的指令流水線優化產生亂序指令。當然這些亂序指令都是為了同一個目的,優化執行效率
happens-before:按照程式的程式碼序執行

int a=0,int b=1;
void func(){
    a=b+22;
    b=22;
}
  • 1
  • 2
  • 3
  • 4
  • 5
程式碼沒有被編譯器優化,按照正常指令執行:
movl    b(%rip), %eax ; 將 b 讀入 %eax
addl    $22, %eax ; %eax 加 22, 即 b + 22
movl    %eax, a(%rip) ; % 將 %eax 寫回至 a, 即 a = b + 22
movl    $22, b(%rip) ; 設定 b = 22
優化後:
movl    b(%rip), %eax ; 將 b 讀入 %eax
movl    $22, b(%rip) ; b = 22
addl    $22, %eax ; %eax 加 22
movl    %eax, a(%rip) ; 將 b + 22 的值寫入 a,即 a = b + 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

synchronized-with:不同執行緒間,對於同一個原子操作,需要同步關係,store()操作一定要先於 load(),也就是說 對於一個原子變數x,先寫x,然後讀x是一個同步的操作,讀x並不會讀取之前的值,而是當前寫x的值。

6種memory_order 主要分成3類,relaxed(鬆弛的記憶體序),sequential_consistency(記憶體一致序),acquire-release(獲取-釋放一致性)


1、relaxed的記憶體序

沒有順序一致性的要求,也就是說同一個執行緒的原子操作還是按照happens-before關係,但不同執行緒間的執行關係是任意。

#include <atomic>
 #include <thread>
 #include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
  x.store(true,std::memory_order_relaxed);  // 1
  y.store(true,std::memory_order_relaxed);  // 2
}
void read_y_then_x()
{
  while(!y.load(std::memory_order_relaxed));  // 3
  if(x.load(std::memory_order_relaxed))  // 4
++z;
}
int main() {
  x=false;
  y=false;
  z=0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
b.join();
  assert(z.load()!=0);  // 5
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

其中即使1先於2(同一個執行緒保證原子執行順序)但是在不同執行緒間的執行順序是沒有約束的,所以#4也有可能是false
這裡寫圖片描述


2、sequential consistency(記憶體一致性)

這個是以犧牲優化效率,來保證指令的順序一致執行,相當於不開啟編譯器優化指令,按照正常的指令序執行(happens-before),多執行緒各原子操作也會Synchronized-with,(譬如atomic::load()需要等待atomic::store()寫下元素才能讀取,同步過程),當然這裡還必須得保證一致性,讀操作需要在“一個寫操作對所有處理器可見”的時候才能讀,適用於基於快取的體系結構

#include <atomic>
#include <vector>
#include <iostream>

std::vector<int> data;
std::atomic_bool data_ready(false);

// 執行緒1
void writer_thread()
{
data.push_back(10); // #1:對data的寫操作
data_ready = true; // #2:對data_ready的寫操作
}

// 執行緒2
void reader_thread()
{
while(!data_ready.load()) // #3:對data_ready的讀操作
{
std::this_thread::sleep(std::milliseconds(10));
}
std::cout << ”data is ” << data[0] << ”\n”; // #4:對data的讀操作
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

這裡寫圖片描述
在同一個執行緒中,執行順序,#1->#2 (原子操作),#3->#4(原子操作),指令序順序執行,同時 保證Synchronized-with,#2->#3 必須要先store原子操作,然後在load原子操作。最終保證順序一致性。這裡z一定是1或者2,並不會出現0的情況,出現0的情況表示不一致。出現1 可能有2種,#3為false,說明x先於y寫,此時y可能正在寫。如果y寫完了,#4是一定成功的,z=1,另一個z=1類似,y先於x寫。當然還有x和y同時寫完,則z=2;

當然要保證這種嚴格的順序一致性,需要犧牲優化代價
1、在無快取的體系結構下實現SC

  • 帶有讀旁路的寫緩衝(Write buffers with read bypassing)
    讀操作可以不等待寫操作,導致後續的讀操作越過前面的寫操作,違反程式次序
  • 重疊寫(Overlapping writes)
    對於不同地址的多個寫操作同時進行,導致後續的寫操作越過前面的讀操作,違反程式次序
  • 非阻塞讀(Nonblocking reads)
    多個讀操作同時進行,導致後續的讀操作越過前面的讀操作先執行,違反程式次序

2、 在有快取的體系結構下實現SC
對於帶有快取的體系結構,這種資料的副本(快取)的出現引入了三個額外的問題:

  • 快取一致性協議(cache coherence protocols)
    一個寫操作最終要對所有處理器可見
    對同一地址的寫操作序列化
    cache coherence的定義不能推出SC(不充分):SC要求對所有地址的寫操作序列化。因此我們並不用cache coherence定義SC, 它僅作為一種傳遞新值(newly written value)的機制。
  • 檢查寫完成(detecting write completion)
    這裡寫圖片描述
    假設圖中的處理器帶有直寫快取(write through cache),P2 快取了 Data. 違反SC的直寫快取
    考慮如下執行次序:

P1 先完成了 Data 在記憶體上的寫操作;
P1 沒有等待 Data 的寫結果傳播到 P2 的快取中,繼續進行 Head 的寫操作;P2 讀取到了記憶體中 Head 的新值;
P2 繼續執行,讀到了快取中 Data 的舊值。

這違反SC,因此我們需要延後每個處理器釋出寫確認通知的時間:直至別的處理器發回寫確認通知,才發射下一個寫操作。

  • 維護寫原子性(maintaining write atomicity):
    “將值的改變傳播給多個處理器的快取”這一操作是非原子操作(非瞬發完成的),因此需要採取特殊措施提供寫原子性的假象。因此我們提出兩個要求,這兩個要求將共同保證寫原子性的實現。

這裡寫圖片描述
要求1:針對同一地址的寫操作被序列化(serialized). 上圖闡述了對這個條件的需求:如果對 A 的寫操作不是序列化的,那麼 P3 和 P4 輸出(暫存器 1,2)的結果將會不同,這違反了次序一致性。這種情況可以在基於網路(而不是匯流排)的系統中產生,由於訊息可經不同路徑傳遞,這種系統不 供它們傳遞次序的保證。

要求2:對一個新寫的值的讀操作,必須要等待所有(別的)快取對該寫操作都返回確認通知後才進行。
這裡寫圖片描述

P2 讀 A 為 1
“P2 對 B 的更新”先於“P1 對 A 的更新”到達 P3
P3 獲得 B 的新值,獲得 A 的舊值
這使得 P2 和 P3 看到的對值 A, B 的寫操作次序不同,違反的了寫原子性要求


3、acquire-release 獲取-釋放一致性

這個是對relaxed的加強,relax序由於無法限制多執行緒間的排序,所以引入synchronized-with,但並不一定意味著,統一的操作順序

#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{x.store(true,std::memory_order_seq_cst); } // 1
void write_y()
{  y.store(true,std::memory_order_seq_cst);}  // 2
void read_x_then_y()
{
  while(!x.load(std::memory_order_seq_cst));
  if(y.load(std::memory_order_seq_cst))  // 3
++z; }
void read_y_then_x()
{
  while(!y.load(std::memory_order_seq_cst));
  if(x.load(std::memory_order_seq_cst))  // 4
++z; }
int main() {
  x=false;
  y=false;
  z=0;
  std::thread a(write_x);
  std::thread b(write_y);
  std::thread c(read_x_then_y);
  std::thread d(read_y_then_x);
  a.join();
  b.join();
  c.join();
  d.join();
  assert(z.load()!=0);  // 5
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

如果是relax記憶體序,會出現z=0的情況,畢竟兩個寫x,y的執行緒以及兩讀x-y,讀y-x的執行緒沒有順序一致性要求,可能出現
#1 寫好x,y資料,但x還在緩衝中,並沒有放入記憶體,這時候讀x的資料#4為false,y寫好資料,也放入緩衝,x也沒有從記憶體讀取新值,所以#3也為false,z=0;兩個執行緒的x,y資料不一致,這種帶有快取違反順序一致。

這裡寫圖片描述

還是看例子:

#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1 自旋,等待y被設定為true
  y.store(true,std::memory_order_release);  // 2
}
void read_y_then_x()
{
  while(!y.load(std::memory_order_acquire));  // 3
  if(x.load(std::memory_order_relaxed))  // 4
++z; }
int main() {
  x=false;
  y=false;
  z=0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
b.join();
  assert(z.load()!=0);  // 5
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

同一個執行緒 #1->#2, 由於acquire-release,#2->#3 ,又在同一個執行緒中,#3->#4,所以傳遞happens-before, #4一定能夠獲取#1的值,必然為true。
如果#3的while去掉,#3 可能由於#2還沒有寫入資料,導致為false, #4 和 #1 因為relaxed記憶體序,在不同執行緒,所以沒有排序。release-acquire 對一般配對出現,如果都為release或者acquire,則無法同步。

例子:

std::atomic<int> data[5];
std::atomic<bool> sync1(false),sync2(false);
void thread_1()
{
  data[0].store(42,std::memory_order_relaxed);
  data[1].store(97,std::memory_order_relaxed);
  data[2].store(17,std::memory_order_relaxed);
  data[3].store(-141,std::memory_order_relaxed);        data[4].store(2003,std::memory_order_relaxed);   sync1.store(true,std::memory_order_release); // 1.設定sync1
}
void thread_2()
{
while(!sync1.load(std::memory_order_acquire)); // 2.直到sync1設定後,迴圈結束
sync2.store(true,std::memory_order_release); // 3.設定sync2 
}
void thread_3()
{
while(!sync2.load(std::memory_order_acquire)); // 4.直到sync2設定後,迴圈結束 assert(data[0].load(std::memory_order_relaxed)==42); assert(data[1].load(std::memory_order_relaxed)==97); assert(data[2].load(std::memory_order_relaxed)==17); assert(data[3].load(std::memory_order_relaxed)==-141); assert(data[4].load(std::memory_order_relaxed)==2003);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

#1->#2 (迴圈一直到sync1被儲存,sync1被儲存,那麼根據happened-before,前面的陣列也設定了)->#3 (同一執行緒) ->#4
當然,在thread2中包含了 acquire-release,所以可以採用compare_exchange_strong()

std::atomic<int> sync(0);
void thread_1()
{
// ...
  sync.store(1,std::memory_order_release);
}
void thread_2()
{
  int expected=1;
  while(!sync.compare_exchange_strong(expected,2,
  std::memory_order_acq_rel)) //執行acquire-release 
    expected=1;
}
void thread_3()
{
  while(sync.load(std::memory_order_acquire)<2);
// ... }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

鎖住互斥量是一個獲取操作,並且解鎖這個互斥量是一個釋放操作

4、memory_order_consume

這個記憶體序是 “獲取-釋放”的一部分,它依賴於資料,可以展示執行緒間的先行關係。
攜帶依賴:

int a=b+1;
int b=c+1;
a攜帶依賴於b,b攜帶依賴於c,a也就攜帶依賴c

struct X
{
int i;
std::string s;
};
std::atomic<X*> p;
std::atomic<int> a;
void create_x()
{
  X* x=new X;
  x->i=42;
  x->s="hello";
  a.store(99,std::memory_order_relaxed);  // 1
  p.store(x,std::memory_order_release);  // 2
}
void use_x()
{
  X* x;
  while(!(x=p.load(std::memory_order_consume)))  // 3 
    std::this_thread::sleep(std::chrono::microseconds(1));
  assert(x->i==42);  // 4
  assert(x->s=="hello");  // 5
  assert(a.load(std::memory_order_relaxed)==99);  // 6
}
int main() {
  std::thread t1(create_x);
  std::thread t2(use_x);
t1.join();
t2.join(); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

x是個指標,依賴2個數據,int i=42, string s=”hello” , #3迴圈,直到#2 x被store,那麼在相應的依賴資料也設定好了,所以在#4,#5的斷言也就可以通過。但a沒有依賴,且是relaxed,無法判定斷言

當然,你也可以使用kill_dependency()打破依賴鏈,在複雜程式碼中慎用。

int global_data[]={ ... }; std::atomic<int> index;
void f() {
  int i=index.load(std::memory_order_consume);
 do_something_with(global_data[std::kill_dependency(i)]);
}
  • 1
  • 2
  • 3
  • 4
  • 5

打破i與index的依賴鏈

std::vector<int> queue_data;
std::atomic<int> _count;

void populate_queue() {
    unsigned const number_of_items = 1000000;
    queue_data.clear();
    for (int i = 0; i < number_of_items; ++i) {
        queue_data.push_back(i);
    }
    _count.store(number_of_items, std::memory_order_release); // 1 初始化儲存
}
void consume_queue_items()
    {
        while(true)
        {
            int item_index;
            if(0 >= (item_index=_count.fetch_sub(1, std::memory_order_acquire))) // 2 一個“讀-改-寫”操作
            {
                cout<<this_thread::get_id()<<":wait for more items"<<endl; // 3 等待更多元素
                continue;
            }
            cout<<this_thread::get_id()<<":"<<queue_data[item_index-1]<<endl; // 4 安全讀取queue_data
        }
    }
    void play() {
        std::thread a(populate_queue);
        std::thread b(consume_queue_items);
        std::thread c(consume_queue_items);
        a.join();
        b.join();
        c.join();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

生產者,消費者模式。

如果fetch_sub採用std::memory_order_acq_rel (本機測試)
b c 的消費不一樣,b大概每消費100個數據,c才消費一個數據 ?(不是很理解)
其他記憶體序都是交替消費.
實線是先行關係,虛線是釋放順序
這裡寫圖片描述

5、柵欄

最後簡單說下柵欄吧,柵欄相當於給記憶體加了一層柵欄,約束記憶體亂序。典型用法是和 relaxed一起使用。

柵欄操作讓無序變有序

std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
  x.store(true,std::memory_order_relaxed);  // 1
  std::atomic_thread_fence(std::memory_order_release);  // 2
  y.store(true,std::memory_order_relaxed);  // 3
}
void read_y_then_x()
{
  while(!y.load(std::memory_order_relaxed));  // 4
  std::atomic_thread_fence(std::memory_order_acquire);  // 5
  if(x.load(std::memory_order_relaxed))  // 6
++z; }
int main() {
  x=false;
  y=false;
  z=0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
b.join();
  assert(z.load()!=0);  // 7
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

在relaxed的例子中加了2道柵欄,#2,#5
柵欄#2同步與柵欄#5,所以 #1和#6就有了先行關係。 #7不會執行

柵欄也會讓非原子操作有序

void write_x_then_y()
{
x=true; // 1 在柵欄前儲存x std::atomic_thread_fence(std::memory_order_release); 
y.store(true,std::memory_order_relaxed); // 2 在柵欄後儲存y
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3 在#2寫入前,持續等待 
std::atomic_thread_fence(std::memory_order_acquire);
if(x) // 4 這裡讀取到的值,是#1中寫入
++z; }
int main() {
  x=false;
  y=false;
  z=0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
b.join();
assert(z.load()!=0); // 5 斷言將不會觸發 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

同上程式碼,斷言不會觸發,x的值一定是#1寫入的

對非原子操作的排序,可以通過使用原子操作進行,這裡“前序”作為“先行”的一部分,就顯得十分重要 了。如果一個非原子操作是“序前”於一個原子操作,並且這個原子操作需要“先行”與另一個執行緒的一個操 作,那麼這個非原子操作也就“先行”於在另外執行緒的那個操作了。

最後是互斥量的基本實現:

一般都是呼叫 具有std::memory_order_acquire語義的 lock操作
主要flag.test_and_set()上的迴圈 ,然後對資料進行修改,最後呼叫unlock(),相當於呼叫帶有 語義的 flag.clear(),基本的互斥量都是這種型別,lock()作為一個獲取操作存在,在同樣的位置上unlock()作為一個釋放操作存在。


參考文件資源:
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter8-Memory-Model/web-resources.md
http://www.parallellabs.com/2011/08/27/c-plus-plus-memory-model/
https://www.zhihu.com/question/24301047
http://www.cnblogs.com/haippy/p/3412858.html
http://preshing.com/