C++11的6種記憶體序總結__std::memory_order_acquire_等
對於C++11的6種併發查了不少相關資料,這裡作一個總結和理解std::memory_order_relaxed,std::memory_order_consume,std::memory_order_acquire
std::memory_order_release,std::memory_order_acq_rel,std::memory_order_seq_cst
粗淺理解(瞭解大概)
編譯器優化而產生的指令亂序,cpu指令流水線也會產生指令亂序,總體來講,編譯器優化層面會產生的指令亂序,cpu層面也會的指令流水線優化產生亂序指令。當然這些亂序指令都是為了同一個目的,優化執行效率
happens-before:按照程式的程式碼序執行
int a=0,int b=1;
void func(){
a=b+22;
b=22;
}
- 1
- 2
- 3
- 4
- 5
程式碼沒有被編譯器優化,按照正常指令執行: movl b(%rip), %eax ; 將 b 讀入 %eax addl $22, %eax ; %eax 加 22, 即 b + 22 movl %eax, a(%rip) ; % 將 %eax 寫回至 a, 即 a = b + 22 movl $22, b(%rip) ; 設定 b = 22 優化後: movl b(%rip), %eax ; 將 b 讀入 %eax movl $22, b(%rip) ; b = 22 addl $22, %eax ; %eax 加 22 movl %eax, a(%rip) ; 將 b + 22 的值寫入 a,即 a = b + 2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
synchronized-with:不同執行緒間,對於同一個原子操作,需要同步關係,store()操作一定要先於 load(),也就是說 對於一個原子變數x,先寫x,然後讀x是一個同步的操作,讀x並不會讀取之前的值,而是當前寫x的值。
6種memory_order 主要分成3類,relaxed(鬆弛的記憶體序),sequential_consistency(記憶體一致序),acquire-release(獲取-釋放一致性)
1、relaxed的記憶體序:
沒有順序一致性的要求,也就是說同一個執行緒的原子操作還是按照happens-before關係,但不同執行緒間的執行關係是任意。
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_relaxed); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z;
}
int main() {
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
其中即使1先於2(同一個執行緒保證原子執行順序)但是在不同執行緒間的執行順序是沒有約束的,所以#4也有可能是false
2、sequential consistency(記憶體一致性)
這個是以犧牲優化效率,來保證指令的順序一致執行,相當於不開啟編譯器優化指令,按照正常的指令序執行(happens-before),多執行緒各原子操作也會Synchronized-with,(譬如atomic::load()需要等待atomic::store()寫下元素才能讀取,同步過程),當然這裡還必須得保證一致性,讀操作需要在“一個寫操作對所有處理器可見”的時候才能讀,適用於基於快取的體系結構。
#include <atomic>
#include <vector>
#include <iostream>
std::vector<int> data;
std::atomic_bool data_ready(false);
// 執行緒1
void writer_thread()
{
data.push_back(10); // #1:對data的寫操作
data_ready = true; // #2:對data_ready的寫操作
}
// 執行緒2
void reader_thread()
{
while(!data_ready.load()) // #3:對data_ready的讀操作
{
std::this_thread::sleep(std::milliseconds(10));
}
std::cout << ”data is ” << data[0] << ”\n”; // #4:對data的讀操作
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
在同一個執行緒中,執行順序,#1->#2 (原子操作),#3->#4(原子操作),指令序順序執行,同時 保證Synchronized-with,#2->#3 必須要先store原子操作,然後在load原子操作。最終保證順序一致性。這裡z一定是1或者2,並不會出現0的情況,出現0的情況表示不一致。出現1 可能有2種,#3為false,說明x先於y寫,此時y可能正在寫。如果y寫完了,#4是一定成功的,z=1,另一個z=1類似,y先於x寫。當然還有x和y同時寫完,則z=2;
當然要保證這種嚴格的順序一致性,需要犧牲優化代價
1、在無快取的體系結構下實現SC
- 帶有讀旁路的寫緩衝(Write buffers with read bypassing)
讀操作可以不等待寫操作,導致後續的讀操作越過前面的寫操作,違反程式次序 - 重疊寫(Overlapping writes)
對於不同地址的多個寫操作同時進行,導致後續的寫操作越過前面的讀操作,違反程式次序 - 非阻塞讀(Nonblocking reads)
多個讀操作同時進行,導致後續的讀操作越過前面的讀操作先執行,違反程式次序
2、 在有快取的體系結構下實現SC
對於帶有快取的體系結構,這種資料的副本(快取)的出現引入了三個額外的問題:
- 快取一致性協議(cache coherence protocols)
一個寫操作最終要對所有處理器可見
對同一地址的寫操作序列化
cache coherence的定義不能推出SC(不充分):SC要求對所有地址的寫操作序列化。因此我們並不用cache coherence定義SC, 它僅作為一種傳遞新值(newly written value)的機制。 - 檢查寫完成(detecting write completion)
假設圖中的處理器帶有直寫快取(write through cache),P2 快取了 Data. 違反SC的直寫快取
考慮如下執行次序:
P1 先完成了 Data 在記憶體上的寫操作;
P1 沒有等待 Data 的寫結果傳播到 P2 的快取中,繼續進行 Head 的寫操作;P2 讀取到了記憶體中 Head 的新值;
P2 繼續執行,讀到了快取中 Data 的舊值。
這違反SC,因此我們需要延後每個處理器釋出寫確認通知的時間:直至別的處理器發回寫確認通知,才發射下一個寫操作。
- 維護寫原子性(maintaining write atomicity):
“將值的改變傳播給多個處理器的快取”這一操作是非原子操作(非瞬發完成的),因此需要採取特殊措施提供寫原子性的假象。因此我們提出兩個要求,這兩個要求將共同保證寫原子性的實現。
要求1:針對同一地址的寫操作被序列化(serialized). 上圖闡述了對這個條件的需求:如果對 A 的寫操作不是序列化的,那麼 P3 和 P4 輸出(暫存器 1,2)的結果將會不同,這違反了次序一致性。這種情況可以在基於網路(而不是匯流排)的系統中產生,由於訊息可經不同路徑傳遞,這種系統不 供它們傳遞次序的保證。
要求2:對一個新寫的值的讀操作,必須要等待所有(別的)快取對該寫操作都返回確認通知後才進行。
P2 讀 A 為 1
“P2 對 B 的更新”先於“P1 對 A 的更新”到達 P3
P3 獲得 B 的新值,獲得 A 的舊值
這使得 P2 和 P3 看到的對值 A, B 的寫操作次序不同,違反的了寫原子性要求
3、acquire-release 獲取-釋放一致性
這個是對relaxed的加強,relax序由於無法限制多執行緒間的排序,所以引入synchronized-with,但並不一定意味著,統一的操作順序
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{x.store(true,std::memory_order_seq_cst); } // 1
void write_y()
{ y.store(true,std::memory_order_seq_cst);} // 2
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst));
if(y.load(std::memory_order_seq_cst)) // 3
++z; }
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst));
if(x.load(std::memory_order_seq_cst)) // 4
++z; }
int main() {
x=false;
y=false;
z=0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load()!=0); // 5
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
如果是relax記憶體序,會出現z=0的情況,畢竟兩個寫x,y的執行緒以及兩讀x-y,讀y-x的執行緒沒有順序一致性要求,可能出現
#1 寫好x,y資料,但x還在緩衝中,並沒有放入記憶體,這時候讀x的資料#4為false,y寫好資料,也放入緩衝,x也沒有從記憶體讀取新值,所以#3也為false,z=0;兩個執行緒的x,y資料不一致,這種帶有快取違反順序一致。
還是看例子:
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1 自旋,等待y被設定為true
y.store(true,std::memory_order_release); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z; }
int main() {
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
同一個執行緒 #1->#2, 由於acquire-release,#2->#3 ,又在同一個執行緒中,#3->#4,所以傳遞happens-before, #4一定能夠獲取#1的值,必然為true。
如果#3的while去掉,#3 可能由於#2還沒有寫入資料,導致為false, #4 和 #1 因為relaxed記憶體序,在不同執行緒,所以沒有排序。release-acquire 對一般配對出現,如果都為release或者acquire,則無法同步。
例子:
std::atomic<int> data[5];
std::atomic<bool> sync1(false),sync2(false);
void thread_1()
{
data[0].store(42,std::memory_order_relaxed);
data[1].store(97,std::memory_order_relaxed);
data[2].store(17,std::memory_order_relaxed);
data[3].store(-141,std::memory_order_relaxed); data[4].store(2003,std::memory_order_relaxed); sync1.store(true,std::memory_order_release); // 1.設定sync1
}
void thread_2()
{
while(!sync1.load(std::memory_order_acquire)); // 2.直到sync1設定後,迴圈結束
sync2.store(true,std::memory_order_release); // 3.設定sync2
}
void thread_3()
{
while(!sync2.load(std::memory_order_acquire)); // 4.直到sync2設定後,迴圈結束 assert(data[0].load(std::memory_order_relaxed)==42); assert(data[1].load(std::memory_order_relaxed)==97); assert(data[2].load(std::memory_order_relaxed)==17); assert(data[3].load(std::memory_order_relaxed)==-141); assert(data[4].load(std::memory_order_relaxed)==2003);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
#1->#2 (迴圈一直到sync1被儲存,sync1被儲存,那麼根據happened-before,前面的陣列也設定了)->#3 (同一執行緒) ->#4
當然,在thread2中包含了 acquire-release,所以可以採用compare_exchange_strong()
std::atomic<int> sync(0);
void thread_1()
{
// ...
sync.store(1,std::memory_order_release);
}
void thread_2()
{
int expected=1;
while(!sync.compare_exchange_strong(expected,2,
std::memory_order_acq_rel)) //執行acquire-release
expected=1;
}
void thread_3()
{
while(sync.load(std::memory_order_acquire)<2);
// ... }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
鎖住互斥量是一個獲取操作,並且解鎖這個互斥量是一個釋放操作
4、memory_order_consume
這個記憶體序是 “獲取-釋放”的一部分,它依賴於資料,可以展示執行緒間的先行關係。
攜帶依賴:
int a=b+1;
int b=c+1;
a攜帶依賴於b,b攜帶依賴於c,a也就攜帶依賴c
struct X
{
int i;
std::string s;
};
std::atomic<X*> p;
std::atomic<int> a;
void create_x()
{
X* x=new X;
x->i=42;
x->s="hello";
a.store(99,std::memory_order_relaxed); // 1
p.store(x,std::memory_order_release); // 2
}
void use_x()
{
X* x;
while(!(x=p.load(std::memory_order_consume))) // 3
std::this_thread::sleep(std::chrono::microseconds(1));
assert(x->i==42); // 4
assert(x->s=="hello"); // 5
assert(a.load(std::memory_order_relaxed)==99); // 6
}
int main() {
std::thread t1(create_x);
std::thread t2(use_x);
t1.join();
t2.join(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
x是個指標,依賴2個數據,int i=42, string s=”hello” , #3迴圈,直到#2 x被store,那麼在相應的依賴資料也設定好了,所以在#4,#5的斷言也就可以通過。但a沒有依賴,且是relaxed,無法判定斷言
當然,你也可以使用kill_dependency()打破依賴鏈,在複雜程式碼中慎用。
int global_data[]={ ... }; std::atomic<int> index;
void f() {
int i=index.load(std::memory_order_consume);
do_something_with(global_data[std::kill_dependency(i)]);
}
- 1
- 2
- 3
- 4
- 5
打破i與index的依賴鏈
std::vector<int> queue_data;
std::atomic<int> _count;
void populate_queue() {
unsigned const number_of_items = 1000000;
queue_data.clear();
for (int i = 0; i < number_of_items; ++i) {
queue_data.push_back(i);
}
_count.store(number_of_items, std::memory_order_release); // 1 初始化儲存
}
void consume_queue_items()
{
while(true)
{
int item_index;
if(0 >= (item_index=_count.fetch_sub(1, std::memory_order_acquire))) // 2 一個“讀-改-寫”操作
{
cout<<this_thread::get_id()<<":wait for more items"<<endl; // 3 等待更多元素
continue;
}
cout<<this_thread::get_id()<<":"<<queue_data[item_index-1]<<endl; // 4 安全讀取queue_data
}
}
void play() {
std::thread a(populate_queue);
std::thread b(consume_queue_items);
std::thread c(consume_queue_items);
a.join();
b.join();
c.join();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
生產者,消費者模式。
如果fetch_sub採用std::memory_order_acq_rel (本機測試)
b c 的消費不一樣,b大概每消費100個數據,c才消費一個數據 ?(不是很理解)
其他記憶體序都是交替消費.
實線是先行關係,虛線是釋放順序
5、柵欄
最後簡單說下柵欄吧,柵欄相當於給記憶體加了一層柵欄,約束記憶體亂序。典型用法是和 relaxed一起使用。
柵欄操作讓無序變有序
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
std::atomic_thread_fence(std::memory_order_release); // 2
y.store(true,std::memory_order_relaxed); // 3
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 4
std::atomic_thread_fence(std::memory_order_acquire); // 5
if(x.load(std::memory_order_relaxed)) // 6
++z; }
int main() {
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 7
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
在relaxed的例子中加了2道柵欄,#2,#5
柵欄#2同步與柵欄#5,所以 #1和#6就有了先行關係。 #7不會執行
柵欄也會讓非原子操作有序
void write_x_then_y()
{
x=true; // 1 在柵欄前儲存x std::atomic_thread_fence(std::memory_order_release);
y.store(true,std::memory_order_relaxed); // 2 在柵欄後儲存y
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3 在#2寫入前,持續等待
std::atomic_thread_fence(std::memory_order_acquire);
if(x) // 4 這裡讀取到的值,是#1中寫入
++z; }
int main() {
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5 斷言將不會觸發 }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
同上程式碼,斷言不會觸發,x的值一定是#1寫入的
對非原子操作的排序,可以通過使用原子操作進行,這裡“前序”作為“先行”的一部分,就顯得十分重要 了。如果一個非原子操作是“序前”於一個原子操作,並且這個原子操作需要“先行”與另一個執行緒的一個操 作,那麼這個非原子操作也就“先行”於在另外執行緒的那個操作了。
最後是互斥量的基本實現:
一般都是呼叫 具有std::memory_order_acquire語義的 lock操作
主要flag.test_and_set()上的迴圈 ,然後對資料進行修改,最後呼叫unlock(),相當於呼叫帶有 語義的 flag.clear(),基本的互斥量都是這種型別,lock()作為一個獲取操作存在,在同樣的位置上unlock()作為一個釋放操作存在。
參考文件資源:
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter8-Memory-Model/web-resources.md
http://www.parallellabs.com/2011/08/27/c-plus-plus-memory-model/
https://www.zhihu.com/question/24301047
http://www.cnblogs.com/haippy/p/3412858.html
http://preshing.com/