1. 程式人生 > >C++併發實戰16: std::atomic原子操作

C++併發實戰16: std::atomic原子操作

     C++中對共享資料的存取在併發條件下可能會引起data race的undifined行為,需要限制併發程式以某種特定的順序執行,有兩種方式:使用mutex保護共享資料,原子操作:針對原子型別操作要不一步完成,要麼不做,不可能出現操作一半被切換CPU,這樣防止由於多執行緒指令交叉執行帶來的可能錯誤。非原子操作下,某個執行緒可能看見的是一個其它執行緒操作未完成的資料。

1  關於bool的原子化

   1.1 std::atomic_flag是一個bool原子型別有兩個狀態:set(flag=true) 和 clear(flag=false),必須被ATOMIC_FLAG_INIT初始化此時flag為clear狀態,相當於靜態初始化。一旦atomic_flag初始化後只有三個操作:test_and_set,clear,析構,均是原子化操作。atomic_flag::test_and_set檢查flag是否被設定,若被設定直接返回true,若沒有設定則設定flag為true後再返回false。atomic_clear()清楚flag標誌即flag=false。不支援拷貝、賦值等操作,這和所有atomic型別一樣,因為兩個原子型別之間操作不能保證原子化。atomic_flag的可操作性不強導致其應用侷限性,還不如atomic<bool>。

    使用atomic_flag作為簡單的自旋鎖例子:本執行緒可以對flag設定了就跳出迴圈,避免使用mutex導致執行緒阻塞

#include <iostream>       // std::cout
#include <atomic>         // std::atomic_flag
#include <thread>         // std::thread
#include <vector>         // std::vector
#include <sstream>        // std::stringstream

std::atomic_flag lock_stream = ATOMIC_FLAG_INIT;//flag處於clear狀態,沒有被設定過
std::stringstream stream;

void append_number(int x) {
  while (lock_stream.test_and_set()) {}//檢查並設定是個原子操作,如以前沒有設定過則退出迴圈,
    //每個執行緒都等待前面一個執行緒將lock_stream狀態清楚後跳出迴圈
  stream << "thread #" << x << '\n'; 
  lock_stream.clear();}
int main (){ 
  std::vector<std::thread> threads; 
  for (int i=1; i<=10; ++i) 
     threads.push_back(std::thread(append_number,i)); 
  for (auto& th : threads) th.join(); std::cout << stream.str(); return 0;
}

        採用class封裝可以用於lock_guard或unique_lock,但是最好不要將此用於任何競態條件下,這是一個busy loop!

class spinlock_mutex
{
   std::atomic_flag flag;
 public:
   spinlock_mutex():
   flag(ATOMIC_FLAG_INIT){}
   void lock()
   {
     while(flag.test_and_set(std::memory_order_acquire));
   }
   void unlock()
   {
     flag.clear(std::memory_order_release);
   }
};
  

2 atomic<T>模板類,生成一個T型別的原子物件,並提供了系列原子操作函式。其中T是trivially  copyable type滿足:要麼全部定義了拷貝/移動/賦值函式,要麼全部沒定義;沒有虛成員;基類或其它任何非static成員都是trivally copyable。典型的內建型別bool、int等屬於trivally copyable。再如class triviall{public: int x};也是。T能夠被memcpy、memcmp函式使用,從而支援compare/exchange系列函式。有一條規則:不要在保護資料中通過使用者自定義型別T通過引數指標或引用使得共享資料超出保護的作用域。atomic<T>編譯器通常會使用一個內部鎖保護,而如果使用者自定義型別T通過引數指標或引用可能產生死鎖。總之限制T可以更利於原子指令。注意某些原子操作可能會失敗,比如atomic<float>、atomic<double>在compare_exchange_strong()時和expected相等但是內建的值表示形式不同於expected,還是返回false,沒有原子算術操作針對浮點數;同理一些使用者自定義的型別T由於記憶體的不同表示形式導致memcmp失敗,從而使得一些相等的值仍返回false。

atomic<T>的成員函式:

template < class T > struct atomic {
    bool is_lock_free() const volatile;//判斷atomic<T>中的T物件是否為lock free的,若是返回true。lock free(鎖無關)指多個執行緒併發訪問T不會出現data race,任何執行緒在任何時刻都可以不受限制的訪問T
    bool is_lock_free() const;
    atomic() = default;//預設建構函式,T未初始化,可能後面被atomic_init(atomic<T>* obj,T val )函式初始化
    constexpr atomic(T val);//T由val初始化
    atomic(const atomic &) = delete;//禁止拷貝
    atomic & operator=(const atomic &) = delete;//atomic物件間的相互賦值被禁止,但是可以顯示轉換再賦值,如atomic<int> a=static_cast<int>(b)這裡假設atomic<int> b
    atomic & operator=(const atomic &) volatile = delete;//atomic間不能賦值
    T operator=(T val) volatile;//可以通過T型別對atomic賦值,如:atomic<int> a;a=10;
    T operator=(T val);
    operator  T() const volatile;//讀取被封裝的T型別值,是個型別轉換操作,預設記憶體序是memory_order_seq需要其它記憶體序則呼叫load
    operator  T() const;//如:atomic<int> a,a==0或者cout<<a<<endl都使用了型別轉換函式
    //以下函式可以指定記憶體序memory_order
    T exchange(T val, memory_order = memory_order_seq_cst) volatile;//將T的值置為val,並返回原來T的值
    T exchange(T val, memory_order = memory_order_seq_cst);
    void store(T val, memory_order = memory_order_seq_cst) volatile;//將T值設為val
    void store(T val, memory_order = memory_order_seq_cst);
    T load(memory_order = memory_order_seq_cst) const volatile;//訪問T值
    T load(memory_order = memory_order_seq_cst) const;
    bool compare_exchange_weak(T& expected, T val, memory_order = memory_order_seq_cst) volatile;//該函式直接比較原子物件所封裝的值與引數 expected 的物理內容,所以某些情況下,物件的比較操作在使用 operator==() 判斷時相等,但 compare_exchange_weak 判斷時卻可能失敗,因為物件底層的物理內容中可能存在位對齊或其他邏輯表示相同但是物理表示不同的值(比如 true 和 2 或 3,它們在邏輯上都表示"真",但在物理上兩者的表示並不相同)。可以虛假的返回false(和expected相同)。若本atomic的T值和expected相同則用val值替換本atomic的T值,返回true;若不同則用本atomic的T值替換expected,返回false。
    bool compare_exchange_weak(T &, T, memory_order = memory_order_seq_cst);
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst) volatile;//
與compare_exchange_weak 不同, strong版本的 compare-and-exchange 操作不允許(spuriously 地)返回 false,即原子物件所封裝的值與引數 expected 的物理內容相同,比較操作一定會為 true。不過在某些平臺下,如果演算法本身需要迴圈操作來做檢查, compare_exchange_weak 的效能會更好。因此對於某些不需要採用迴圈操作的演算法而言, 通常採用compare_exchange_strong 更好
    bool compare_exchange_strong(T &, T, memory_order = memory_order_seq_cst);
};


    cplusplus給出的例子之一:

// atomic::compare_exchange_weak example:
#include <iostream>       // std::cout
#include <atomic>         // std::atomic
#include <thread>         // std::thread
#include <vector>         // std::vector
// a simple global linked list:
struct Node { int value; Node* next; };
std::atomic<Node*> list_head (nullptr);
void append (int val) {     // append an element to the list
  Node* newNode = new Node {val,list_head};
  // next is the same as: list_head = newNode, but in a thread-safe way:
  while (!list_head.compare_exchange_weak(newNode->next,newNode)) {}
  // (with newNode->next updated accordingly if some other thread just appended another node)
}
int main ()
{
  // spawn 10 threads to fill the linked list:
  std::vector<std::thread> threads;
  for (int i=0; i<10; ++i) threads.push_back(std::thread(append,i));
  for (auto& th : threads) th.join();
  // print contents:
  for (Node* it = list_head; it!=nullptr; it=it->next)
    std::cout << ' ' << it->value;
  std::cout << '\n';
  // cleanup:
  Node* it; while (it=list_head) {list_head=it->next; delete it;}
  return 0;
}


程式輸出:
 9 8 7 6 5 4 3 2 1 0


3 std::atomic針對整數和指標的特化:

不能像傳統那樣拷貝和賦值,可以通過內建成員函式load(),store(),exchange()完成賦值,支援複合賦值運算,自增自減運算,還有特有的fetch系列函式

整型特化:

指標特化:

函式說明:
T fetch_add (T val, memory_order sync = memory_order_seq_cst) noexcept;//整型
T fetch_add (ptrdiff_t val, memory_order sync = memory_order_seq_cst) noexcept;//指標
將原子物件的封裝值加 val,並返回原子物件的舊值(適用於整形和指標型別的 std::atomic 特化版本),整個過程是原子的

T fetch_and (T val, memory_order sync = memory_order_seq_cst) noexcept;//將原子物件的封裝值按位與 val,並返回原子物件的舊值(只適用於整型的 std::atomic 特化版本),整個過程是原子的。

T fetch_or (T val, memory_order sync = memory_order_seq_cst) noexcept;//將原子物件的封裝值按位或 val,並返回原子物件的舊值(只適用於整型的 std::atomic 特化版本),整個過程是原子的。

 fetch_xor (T val, memory_order sync = memory_order_seq_cst) noexcept;//將原子物件的封裝值按位異或 val,並返回原子物件的舊值(只適用於整型的 std::atomic 特化版本),整個過程是原子的。

operator++
pre-increment (1)	
T operator++() volatile noexcept;
T operator++() noexcept;
post-increment (2)	
T operator++ (int) volatile noexcept;
T operator++ (int) noexcept;
自增運算子過載, 第一種形式 (1) 返回自增後的值(即字首++),第二種形式(2) 返回自增前的值(即字尾++),適用於整形和指標型別的 std::atomic 特化版本。
operator--
自減運算子過載, 第一種形式 (1) 返回自減後的值(即字首--),第二種形式(2) 返回自減前的值(即字尾--),適用於整形和指標型別的 std::atomic 特化版本。
atomic::operator (comp. assign.)
複合賦值運算子過載,主要包含以下形式:
if T is integral (1)	
T operator+= (T val) volatile noexcept;
T operator+= (T val) noexcept;
T operator-= (T val) volatile noexcept;
T operator-= (T val) noexcept;
T operator&= (T val) volatile noexcept;
T operator&= (T val) noexcept;
T operator|= (T val) volatile noexcept;
T operator|= (T val) noexcept;
T operator^= (T val) volatile noexcept;
T operator^= (T val) noexcept;
if T is pointer (2)	
T operator+= (ptrdiff_t val) volatile noexcept;
T operator+= (ptrdiff_t val) noexcept;
T operator-= (ptrdiff_t val) volatile noexcept;
T operator-= (ptrdiff_t val) noexcept;
以上各個 operator 都會有對應的 fetch_* 操作,詳細見下表:

操作符	成員函式	支援型別
複合賦值	等價於	整型	指標型別	其他型別
+	atomic::operator+=	atomic::fetch_add	是	是	否
-	atomic::operator-=	atomic::fetch_sub	是	是	否
&	atomic::operator&=	atomic::fetch_and	是	否	否
|	atomic::operator|=	atomic::fetch_or	是	否	否
^	atomic::operator^=	atomic::fetch_xor	是	否	否



4 C風格的atomic型別及其操作,有點繁雜這裡不贅述了,參看:點選開啟連結點選開啟連結。前面成員函式字首atomic_形成原子函式,函式的第一個引數必須是原子型別,如:

atomic_store (volatile atomic<T>* obj, T val)
如果你需要顯式指定記憶體序,應該使用atomic_store_explicit。所以字首atomic_表示c風格的原子自由函式,字尾_explicit指定記憶體序。

5    atomic<bool>,atomic<bool>同樣不可以賦值(這裡指兩個atomic<bool>間的賦值)、拷貝,但是其可以直接初始化,如:atomic<bool> flag(false); flag=true。atomic_flag::test_and_set()被atomic::exchange()替代,更多操作見atomic<T>。

std::atomic<bool> b;
bool x=b.load(std::memory_order_acquire);
b.store(true);
x=b.exchange(false,std::memory_order_acq_rel);//更改為false並返回原來的值
    compare_exchange_weak/strong函式是保證在比較和交換執行下原子化,但是此函式可能與expected值相等的情形下atomic的T值沒有替換為val,這時atomic值未變且返回false,compare_exchange_weak可能失敗,特別是執行緒數多於CPU核心數時compare-exchange這個指令序列可能CPU不能保證原子化,所以經常在迴圈中:
bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);

    compare_exchange_strong可以保證當atomic不等於expected時返回false,不需要迴圈保護。     std::atomic_flag是lock free的,但是atomic<bool>不一定是lock free的,可以用atomic<T>::is_lock_free()判斷。      一個例子的程式碼片段:讀者寫者
#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);
void reader_thread()
{
  while(!data_ready.load()) 
  {
    std::this_thread::sleep(std::milliseconds(1));
  }
  std::cout<<”The answer=”<<data[0]<<”\n”;// 1 
}
void writer_thread()
{
  data.push_back(42);//2  由於1和2處發生了data race,所以需要執行緒同步,可以使用mutex,此處使用atomic<bool>強制執行緒間有個順序關係 
  data_ready=true; 
}

6 std::atomic<T*> 指標原子化型別,和atomic<bool>一樣,其也是不可複製拷貝的,擁有is_lock_free,load,store,exchange,compare_exchange_weak/strong等成員函式,只不過內在成員換為指標型別T*。還提供了指標算術操作,fetch_add()、fetch_sub()記憶體地址的加減(都是原子操作),+=和-=兩個複合賦值操作符,++和--的自增運算子。例如:std::atomic<Foo*> x; x+=3; 表示指向第四個Foo*並返回這個Foo*;x.fetch_add(3)表示x地址向後移動3個,但是返回原來的Foo*。
class Foo{};
Foo some_array[5];
std::atomic<Foo*> p(some_array);
Foo* x=p.fetch_add(2); 
assert(x==some_array);
assert(p.load()==&some_array[2]);
x=(p-=1); 
assert(x==&some_array[1]);
assert(p.load()==&some_array[1]);


7 整數原子型別(如std::atomic<long long>)和其它atomic一樣,均有load,store,exchange,fetch_add,fetch_sub等成員函式,但是整數有自己的一些操作符: fetch_and(), fetch_or(),+=, -=, &=, |=,fetch_xor(),^=,++,--。和atomic<T*>一樣,fetch_系列函式返回的是舊值,複合賦值運算返回的是新值。整數原子型別沒有乘法、除法、位移操作,因為整數原子型別通常用於計數或者位標記。 8 注意std::shared_ptr<T>是原子型別,所以使用shared_ptr是執行緒安全的。C風格的自由原子函式也適用於shared_ptr。
std::shared_ptr<my_data> p;
void process_global_data()
{
  std::shared_ptr<my_data> local=std::atomic_load(&p);
  process_data(local);
}
void update_global_data()
{
  std::shared_ptr<my_data> local(new my_data);
  std::atomic_store(&p,local);
}