【C++多執行緒】std::future、std::async、std::promise、std::packaged_task、std::shared_future
如圖以下是標頭檔案<future>中的類容。
std::future<T>
future有兩個類模板,一個獨佔的std::future,也就是隻能被獲取一次,另一個是共享的std::shared_future。std::future<T>是一個類模板,其中T是要儲存的值的型別,std::future 的例項只能與一個指定事件相關聯。std::future物件在內部儲存一個將來會被賦值的值,並提供了一個訪問該值的機制,通過get()成員函式實現。但如果有人檢視在get()函式可用之前通過它來訪問相關的值,那麼get()函式將會阻塞,直到該值可用。std::future的一個物件,可以從某個物件(std::promise和std::packaged_task)或函式(std::async())獲取值,並在不同執行緒之間提供恰當的同步訪問。如
- std::future物件的get()成員函式會等待執行緒執行結束並返回結果,拿不到結果它就會一直等待,感覺有點像join()但是,它是可以獲取結果的。
- std::future物件的wait()成員函式,用於等待執行緒返回,本身並不返回結果,這個效果和 std::thread 的join()更像。
- std::future物件的share()成員函式,將該future物件返回為shared_future的物件。
1 // future example 2 #include <iostream> // std::cout 3 #include <future> // std::async, std::future 4 #include <chrono> // std::chrono::milliseconds 5 6 // a non-optimized way of checking for prime numbers:7 bool is_prime (int x) { 8 for (int i=2; i<x; ++i) if (x%i==0) return false; 9 return true; 10 } 11 12 int main () 13 { 14 // call function asynchronously: 15 std::future<bool> fut = std::async (is_prime,444444443); 16 17 // do something while waiting for function to set future: 18 std::cout << "checking, please wait"; 19 std::chrono::milliseconds span (100); 20 while (fut.wait_for(span)==std::future_status::timeout) 21 std::cout << '.' << std::flush; 22 23 bool x = fut.get(); // retrieve return value 24 25 std::cout << "\n444444443 " << (x?"is":"is not") << " prime.\n"; 26 27 return 0; 28 }
std::future_status是列舉型別,表示非同步任務的執行狀態。std::future和std::shared_future的成員函式wait_for()和wait_until()會返回該型別。型別的取值有
-
std::future_status::ready
-
std::future_status::timeout
-
std::future_status::deferred
std::async()
std::async()是一個函式模板,用來啟動一個非同步任務,啟動起來一個非同步任務之後,它返回一個std::future物件。std::async()使用方法和std::thread()相同。
*std::thread產生的執行緒需要在主執行緒中呼叫需要join或者detach,否則會出現異常,而std::async產生的執行緒不需要我們做任何處理。
1 #include <iostream> 2 #include <future> 3 using namespace std; 4 class A { 5 public: 6 int mythread(int mypar) { 7 cout << mypar << endl; 8 return mypar; 9 } 10 }; 11 12 13 int mythread() { 14 cout << "mythread() start" << "threadid = " << std::this_thread::get_id() << endl; 15 std::chrono::milliseconds dura(5000); 16 std::this_thread::sleep_for(dura); 17 cout << "mythread() end" << "threadid = " << std::this_thread::get_id() << endl; 18 return 5; 19 } 20 21 22 int main() { 23 A a; 24 int tmp = 12; 25 cout << "main" << "threadid = " << std::this_thread::get_id() << endl; 26 std::future<int> result1 = std::async(mythread); 27 cout << "continue........" << endl; 28 cout << result1.get() << endl; //阻塞在這裡等待mythread()執行完畢,拿到結果 29 30 //類成員函式 31 std::future<int> result2 = std::async(&A::mythread, &a, tmp); //第二個引數是物件引用才能保證執行緒裡執行的是同一個物件 32 cout << result2.get() << endl; 33 //或者result2.wait(); 34 cout << "good luck" << endl; 35 return 0; 36 }
我們通過向std::async()傳遞一個引數,改引數是std::launch型別(列舉型別),來達到一些特殊的目的:
1、std::lunch::deferred,(defer推遲,延期)表示執行緒入口函式的呼叫會被延遲,一直到std::future的wait()或者get()函式被呼叫時(由主執行緒呼叫)才會執行;如果wait()或者get()沒有被呼叫,則不會執行。
實際上根本就沒有建立新執行緒。std::lunch::deferred意思時延遲呼叫,並沒有建立新執行緒,是在主執行緒中呼叫的執行緒入口函式。
2、std::launch::async,在呼叫async函式的時候就開始建立新執行緒。就是std::async()的預設情況。
1 #include <iostream> 2 #include <future> 3 using namespace std; 4 5 int mythread() { 6 cout << "mythread() start" << "threadid = " << std::this_thread::get_id() << endl; 7 std::chrono::milliseconds dura(5000); 8 std::this_thread::sleep_for(dura); 9 cout << "mythread() end" << "threadid = " << std::this_thread::get_id() << endl; 10 return 5; 11 } 12 13 14 int main() { 15 cout << "main" << "threadid = " << std::this_thread::get_id() << endl; 16 std::future<int> result1 = std::async(std::launch::deferred ,mythread); 17 cout << "continue........" << endl; 18 cout << result1.get() << endl; //卡在這裡等待mythread()執行完畢,拿到結果 19 cout << "good luck" << endl; 20 return 0; 21 }
std::promise<T>
還有讓std::future 與一個任務例項相關聯的唯一方式,可以將任務包裝入一個 std::packaged_task<> 例項中,或使用 std::promise<> 型別模板顯示設定值。與 std::promise<> 對比, std::packaged_task<> 具有更高層的抽象。
std::promise也是一個類模板,其物件有可能在將來對值進行賦值,每個std::promise物件有一個對應的std::future物件,std::promise儲存的值可被與之關聯的std::future讀取,讀取操作可以發生在其它執行緒。std::promise允許move語義(右值構造,右值賦值),但不允許拷貝(拷貝構造、賦值),std::future亦然。std::promise<void>
是合法的,此時std::promise.set_value不接受任何引數,僅用於通知關聯的std::future.get()解除阻塞。
std::promise和std::future合作共同實現了多執行緒間通訊。
1 #include <iostream> 2 #include <thread> 3 #include <future> 4 #include <chrono> 5 6 // 執行緒B 7 void initiazer(std::promise<int> * promObj) 8 { 9 std::cout << "Thread B" << std::endl; 10 // set the value at proper time 11 std::this_thread::sleep_for(std::chrono::seconds(3)); 12 promObj->set_value(23); 13 } 14 15 int main() 16 { 17 // 執行緒A 18 std::promise<int> promiseObj; 19 std::future<int> futureObj = promiseObj.get_future(); 20 21 std::thread th(initiazer, &promiseObj); // 啟動執行緒B 22 23 // 獲取物件的值,該呼叫在B設定其值後會返回23,在B設定其值前會阻塞 24 std::cout<< futureObj.get() << std::endl; 25 26 th.join(); 27 28 return 0; 29 } 30 31 //輸出23
1 #include <iostream> // std::cout, std::endl 2 #include <thread> // std::thread 3 #include <string> // std::string 4 #include <future> // std::promise, std::future 5 #include <chrono> // seconds 6 using namespace std::chrono; 7 //執行緒B 8 void read(std::future<std::string> *future) { 9 // future會一直阻塞,直到有值到來 10 std::cout << future->get() << std::endl; 11 } 12 //執行緒A 13 int main() { 14 // promise 相當於生產者 15 std::promise<std::string> promise; 16 // future 相當於消費者, 右值構造 17 std::future<std::string> future = promise.get_future(); 18 // 另一執行緒中通過future來讀取promise的值 19 std::thread thread(read, &future); 20 // 讓read等一會兒:) 21 std::this_thread::sleep_for(seconds(1)); 22 // 23 promise.set_value("hello future"); 24 // 等待執行緒執行完成 25 thread.join(); 26 27 return 0; 28 } 29 // 控制檯輸: hello future
如上程式碼中,一旦std::promise物件呼叫set_value設定了物件的值,該物件的共享狀態就變更為ready,std::future物件就能使用get()函式獲取到值。
注意:
- 只能從promise共享狀態獲取一個future物件,不能把兩個future關聯到同一個promise
- 如果promise不設定值或者異常,promise 物件在析構時會自動地設定一個 future_error 異常(broken_promise)來設定其自身的就緒狀態
- promise 物件的set_value只能被呼叫一次,多次呼叫會丟擲std::future_error異常(因為第一次呼叫後狀態變更為ready)
- std::future是通過std::promise::get_future獲取到的,自己構造出來的無效
1 #include <iostream> // std::cout, std::endl 2 #include <thread> // std::thread 3 #include <future> // std::promise, std::future 4 #include <chrono> // seconds 5 using namespace std::chrono; 6 7 void read(std::future<int> future) { 8 try { 9 future.get(); 10 } catch(std::future_error &e) { 11 std::cerr << e.code() << "\n" << e.what() << std::endl; 12 } 13 } 14 15 int main() { 16 std::thread thread; 17 { 18 // 如果promise不設定任何值 19 // 則在promise析構時會自動設定為future_error 20 // 這會造成future.get丟擲該異常 21 std::promise<int> promise; 22 thread = std::thread(read, promise.get_future()); 23 } 24 thread.join(); 25 26 return 0; 27 }
通過std::promise::set_exception函式可以設定自定義異常,該異常最終會被傳遞到std::future,並在其get函式中被丟擲。
1 #include <iostream> 2 #include <future> 3 #include <thread> 4 #include <exception> // std::make_exception_ptr 5 #include <stdexcept> // std::logic_error 6 7 void catch_error(std::future<void> &future) { 8 try { 9 future.get(); 10 } catch (std::logic_error &e) { 11 std::cerr << "logic_error: " << e.what() << std::endl; 12 } 13 } 14 15 int main() { 16 std::promise<void> promise; 17 std::future<void> future = promise.get_future(); 18 19 std::thread thread(catch_error, std::ref(future)); 20 // 自定義異常需要使用make_exception_ptr轉換一下 21 promise.set_exception( 22 std::make_exception_ptr(std::logic_error("caught"))); 23 24 thread.join(); 25 return 0; 26 } 27 // 輸出:logic_error: caught
std::packaged_task<T>
std::packaged_task 包裝一個可呼叫的物件,並且允許非同步獲取該可呼叫物件產生的結果,從包裝可呼叫物件意義上來講,std::packaged_task 與 std::function 類似,只不過 std::packaged_task 將其包裝的可呼叫物件的執行結果傳遞給一個 std::future 物件(該物件通常在另外一個執行緒中獲取 std::packaged_task 任務的執行結果)。
std::packaged_task 物件內部包含了兩個最基本元素,一、被包裝的任務(stored task),任務(task)是一個可呼叫的物件,如函式指標、成員函式指標或者函式物件,二、共享狀態(shared state),用於儲存任務的返回值,可以通過 std::future 物件來達到非同步訪問共享狀態的效果。
可以通過 std::packged_task::get_future 來獲取與共享狀態相關聯的 std::future 物件。在呼叫該函式之後,兩個物件共享相同的共享狀態,具體解釋如下:
- std::packaged_task 物件是非同步 Provider,它在某一時刻通過呼叫被包裝的任務來設定共享狀態的值。
- std::future 物件是一個非同步返回物件,通過它可以獲得共享狀態的值,當然在必要的時候需要等待共享狀態標誌變為 ready.
std::packaged_task 的共享狀態的生命週期一直持續到最後一個與之相關聯的物件被釋放或者銷燬為止。
1 #include <iostream> // std::cout 2 #include <future> // std::packaged_task, std::future 3 #include <chrono> // std::chrono::seconds 4 #include <thread> // std::thread, std::this_thread::sleep_for 5 6 // count down taking a second for each value: 7 int countdown (int from, int to) { 8 for (int i=from; i!=to; --i) { 9 std::cout << i << '\n'; 10 std::this_thread::sleep_for(std::chrono::seconds(1)); 11 } 12 std::cout << "Finished!\n"; 13 return from - to; 14 } 15 16 int main () 17 { 18 std::packaged_task<int(int,int)> task(countdown); // 設定 packaged_task 19 std::future<int> ret = task.get_future(); // 獲得與 packaged_task 共享狀態相關聯的 future 物件. 20 21 std::thread th(std::move(task), 10, 0); //建立一個新執行緒完成計數任務. 22 23 int value = ret.get(); // 等待任務完成並獲取結果. 24 25 std::cout << "The countdown lasted for " << value << " seconds.\n"; 26 27 th.join(); 28 return 0; 29 }
std::shared_future<T>
std::shared_future:也是個類模板,可以讓多個執行緒等待同一個事件,用法和std::future差不多。區別是std::future的 get() 成員函式是轉移資料,只能get()一次;std::shared_future 的 get()成員函式是複製資料,可以get()多次。
獲取多次
1 #include <thread> 2 #include <iostream> 3 #include <future> 4 using namespace std; 5 6 int mythread() { 7 cout << "mythread() start" << "threadid = " << std::this_thread::get_id() << endl; 8 std::chrono::milliseconds dura(5000); 9 std::this_thread::sleep_for(dura); 10 cout << "mythread() end" << "threadid = " << std::this_thread::get_id() << endl; 11 return 5; 12 } 13 14 int main() { 15 cout << "main" << "threadid = " << std::this_thread::get_id() << endl; 16 std::packaged_task<int()> mypt(mythread); 17 std::thread t1(std::ref(mypt)); 18 std::future<int> result = mypt.get_future(); 19 20 bool ifcanget = result.valid(); //判斷future 中的值是不是一個有效值 21 std::shared_future<int> result_s(result.share()); //執行完畢後result_s裡有值,而result裡空了 22 //std::shared_future<int> result_s(std::move(result)); 23 //通過get_future返回值直接構造一個shared_future物件 24 //std::shared_future<int> result_s(mypt.get_future()); 25 t1.join(); 26 27 auto myresult1 = result_s.get(); 28 auto myresult2 = result_s.get(); 29 30 cout << "good luck" << endl; 31 return 0; 32 }
在每一個 std::shared_future 的獨立物件上成員函式呼叫返回的結果還是不同步的,所以為了在多個執行緒訪問一個獨立物件時,避免資料競爭,必須使用鎖來對訪問進行保護。優先使用的辦法:為了替代只有一個拷貝物件的情況,可以讓每個執行緒都擁有自己對應的拷貝物件。這樣,當每個執行緒都通過自己擁有的 std::shared_future 物件獲取結果,那麼多個執行緒訪問共享同步結果就是安全的。
參考
http://www.cplusplus.com/reference/future/