在 C++ 中使用 PPL 進行非同步程式設計
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
萊塢星探通常要拒絕那些有抱負的演員時,都會輕蔑地說:“別聯絡我們,我們會聯絡你的。”然而對於開發人員來說,那句話卻道出了許多軟體框架工作的祕密,與其讓程式設計師驅動整個應用程式的控制流,不如讓框架控制環境並呼叫程式設計師提供的回撥或事件處理程式。在非同步系統中,本範例讓你將非同步操作的開始與完成進行分離。
在 Windows 8 中,非同步操作很普遍,並且 WinRT 提供了一個新程式設計模型,以一致方式對非同步進行處理。
圖 1 演示了處理非同步操作的基本模式。在這段程式碼中,C++ 函式從檔案讀取字串。
圖 1 從檔案進行讀取
- template
- void ReadString(String^ fileName, Callback func)
- {
- StorageFolder^ item = KnownFolders::PicturesLibrary;
- auto getFileOp = item->GetFileAsync(fileName);
- getFileOp->Completed = ref new
- ([=](IAsyncOperation<StorageFile^>^ operation, AsyncStatus status)
- {
- auto storageFile = operation->GetResults();
- auto openOp = storageFile->OpenAsync(FileAccessMode::Read);
- openOp->Completed =
- ref new AsyncOperationCompletedHandler <IRandomAccessStream^>
- ([=](IAsyncOperation<IRandomAccessStream^>^ operation, AsyncStatus status)
- {
- auto istream = operation->GetResults();
- auto reader = ref new DataReader(istream);
- auto loadOp = reader->LoadAsync(istream->Size);
- loadOp->Completed = ref new AsyncOperationCompletedHandler<UINT>
- ([=](IAsyncOperation<UINT>^ operation, AsyncStatus status)
- {
- auto bytesRead = operation->GetResults();
- auto str = reader->ReadString(bytesRead);
- func(str);
- });
- });
- });
- }
要注意的第一件事情是 ReadString 的返回型別為 void。沒錯:該函式不返回值;相反,它使用使用者提供的回撥,並在結果可用時呼叫回撥。歡迎來到非同步程式設計的世界:“別聯絡我們,我們會聯絡你的!”
WinRT 非同步操作的分析
WinRT 中非同步的核心是在 Windows::Foundation 名稱空間中定義的四個介面:IAsyncOperation、IAsyncAction、IAsyncOperationWithProgress 和 IAsyncActionWithProgress。WinRT 中所有潛在阻止或長期執行的操作都被定義為非同步。按照慣例,方法的名稱都以“Async”結尾,而返回型別則為四個介面中的一個。例如圖 1 所示示例中的方法 GetFileAsync,它返回 IAsyncOperation<StorageFile^>。許多非同步操作不返回值,且它們的型別為 IAsyncAction。可以報告進度的操作將通過 IAsyncOperationWithProgress 和 IAsyncActionWithProgress 公開。
要為非同步操作指定完成回撥,可以設定 Completed 屬性。該屬性是一個接收非同步介面和完成狀態的委託。儘管該委託可以使用函式指標進行例項化,但你通常使用 lambda(我希望到現在為此,你已經熟悉這部分的 C++11)。
要獲得操作的值,需要對介面呼叫 GetResults 方法。請注意,儘管這是從 GetFileAsync 呼叫返回給你的同樣介面,但是當你位於完成處理程式中時,你只能對它呼叫 GetResults。
完成委託的第二個引數是 AsyncStatus,它返回操作的狀態。在實際的應用程式中,你將先檢查它的值再呼叫 GetResults。在圖 1 中,為了簡單起見而省略了這部分。
你經常會發現,自己同時使用多個非同步操作。在我的示例中,我首先獲取 StorageFile 的例項(通過呼叫 GetFileAsync),然後使用 OpenAsync 開啟它,再獲取 IInputStream。接下來,我載入資料 (LoadAsync) 並使用 DataReader 進行讀取。最後,獲取字串並呼叫使用者提供的回撥函式。
組合
將操作的啟動和完成分離對於消除阻止呼叫非常重要。問題是撰寫多個基於回撥的非同步操作非常困難,並且得到的程式碼很難研究和除錯。必須採取措施控制隨之發生的“回撥亂局”。
讓我們看一個具體的示例。我想使用之前示例中的 ReadString 函式按順序在兩個檔案中進行讀取,然後將結果連線成一個字串。我打算再次將它實現為採用回撥的函式:
- template<typename Callback>
- void ConcatFiles1(String^ file1, String^ file2, Callback func)
- {
- ReadString(file1, [func](String^ str1) {
- ReadString(file2, [func](String^ str2) {
- func(str1+str2);
- });
- });
- }
效果還不錯吧?
如果你看不出這個解決方案存在的瑕疵,那麼請考慮下這個問題:什麼時候開始從 file2 進行讀取?你真的需要先讀完第一個檔案,再開始讀第二個檔案嗎?當然不是!積極啟動多個非同步操作並在資料傳入時進行處理,效果要好得多。
我們來試一試。首先,因為我併發啟動了兩個操作,並在操作完成前從函式返回,所以我需要一個特殊的堆分配物件存放中間結果。我將它命名為 ResultHolder:
- ref struct ResultHolder
- {
- String^ str;
- };
如圖 2 所示,接下來的第一個操作是設定 results->str 成員。要完成的第二個操作將用它構成最終的結果。
圖 2 併發從兩個檔案進行讀取
- template<typename Callback>
- void ConcatFiles(String^ file1, String^ file2, Callback func)
- {
- auto results = ref new ResultHolder();
- ReadString(file1, [=](String^ str) {
- if(results->str != nullptr) { // Beware of the race condition!
- func(str + results->str);
- }
- else{
- results->str = str;
- }
- });
- ReadString(file2, [=](String^ str) {
- if(results->str != nullptr) { // Beware of the race condition!
- func(results->str + str);
- }
- else{
- results->str = str;
- }
- });
- }
大多數時候這種做法都是奏效的。該程式碼有很明顯的爭用條件,並且它不處理錯誤,因此我們仍然有很多工作要做。對於結合兩個操作這麼簡單的事情,卻用了這麼多的程式碼,難免會出錯。
並行模式庫中的任務
Visual Studio 並行模式庫 (PPL) 旨在讓 C++ 中非同步並行程式的編寫變得簡單而高效。PPL 使用者可以使用諸如任務、並行演算法(例如 parallel_for 和 parallel_sort)等更高階的抽象和併發友好型容器(例如 concurrent_vector),來取代線上程和執行緒池級執行。
PPL 任務類是下一版 Visual Studio 中的新增功能,它使你可以簡潔地表示要非同步執行的單個工作單元。使用該功能可以按照獨立(或互相獨立)任務表達程式邏輯,然後讓執行時以最佳方式安排這些任務。
任務之所以這麼有用,是因為它們的可組合性。在最簡單的形式中,對於兩個任務,可以將一個任務宣告為另一個任務的延續來按順序編寫。這看起來非常簡單的結構卻允許你以有趣的方式組合多個任務。諸如聯接和選項(我稍後再進行介紹)的許多更高階 PPL 構造都是通過這個概念自我建構的。任務延續還可用於以更簡潔方式表示非同步操作的完成。讓我們重新看看圖 1 中的示例,現在使用 PPL 任務編寫它,如圖 3 所示。
圖 3 使用巢狀的 PPL 任務從檔案進行讀取
- task<String^> ReadStringTask(String^ fileName)
- {
- StorageFolder^ item = KnownFolders::PicturesLibrary;
- task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
- return getFileTask.then([](StorageFile^ storageFile) {
- task<IRandomAccessStream^> openTask(storageFile->OpenAsync(
- FileAccessMode::Read));
- return openTask.then([](IRandomAccessStream^ istream) {
- auto reader = ref new DataReader(istream);
- task<UINT> loadTask(reader->LoadAsync(istream->Size));
- return loadTask.then([reader](UINT bytesRead) {
- return reader->ReadString(bytesRead);
- });
- });
- });
- }
因為我現在使用任務而不是回調錶示非同步,所以使用者提供的回撥消失了。該函式實際改為返回任務。
在實現過程中,我從 GetFileAsync 返回的非同步操作建立了 getFileTask 任務,然後將該操作的完成設定為任務的延續(使用 then 方法)。
then 方法值得仔細研究一下。該方法的引數是 lambda 表示式。實際上,引數還可以是函式指標、函式物件或 std::function 的例項,但是因為 lambda 表示式在 PPL 中十分普遍(實際上在現代的 C++ 中也一樣),從這裡開始我將只說“lambda”,用來表示所有型別的可呼叫物件。
then 方法的返回型別是某型別 T 的任務。這種型別 T 由傳遞給 then 的 lambda 返回型別決定。在最基本的形式下,當 lambda 返回型別 T 的表示式時,then 方法返回 task<T>。例如,下面延續中的 lambda 返回了 int;因此,生成型別為 task<int>:
- task<int> myTask = someOtherTask.then([]() { return 42; });
圖 3 中使用的延續型別稍有不同。它返回一個任務並執行該任務的非同步展開,所以生成型別不是 task<task<int>>,而是 task<int>:
- task<int> myTask = someOtherTask.then([]() {
- task<int> innerTask([]() {
- return 42;
- });
- return innerTask;
- });
如果所有這些讓你覺得有點頭大,不要緊,繼續往下看。我保證在幾個具有代表意義的示例之後,立即就會豁然開朗起來的。
任務組合
根據上面部分講述的內容,繼續在檔案讀取示例的基礎上進行構建。
前面曾提到,C++ 中函式和 lambda 的所有本地變數在返回時均已丟失。要保持該狀態,你必須手動將變數複製到堆或其他某個生存期較長的儲存。這就是為什麼我之前就建立了儲存器類。在非同步執行的 lambda 中,請務必小心不要通過指標或引用捕獲外圍函式的任何狀態;否則,當函式完成時,你將隨指標終止於一個無效的記憶體位置。
我要強調的是,then 方法對非同步介面執行了展開操作,我以更簡潔的形式重寫了示例,然而成本只不過是引入了另一個儲存器結構,如圖 4 所示。
圖 4 連結多個任務
- ref struct Holder
- {
- IDataReader^ Reader;
- };
- task<String^> ReadStringTask(String^ fileName)
- {
- StorageFolder^ item = KnownFolders::PicturesLibrary;
- auto holder = ref new Holder();
- task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
- return getFileTask.then([](StorageFile^ storageFile) {
- return storageFile->OpenAsync(FileAccessMode::Read);
- }).then([holder](IRandomAccessStream^ istream) {
- holder->Reader = ref new DataReader(istream);
- return holder->Reader->LoadAsync(istream->Size);
- }).then([holder](UINT bytesRead) {
- return holder->Reader->ReadString(bytesRead);
- });
- }
與圖 3 中的示例相比,這段程式碼更易於閱讀,因為它呈現的是按順序的步驟,而不是“樓梯式”的巢狀操作。
除了 then 方法,PPL 還具有一些其他組合構造。其中一個是聯接操作,由 when_all 方法實現。when_all 方法採用一系列任務然後返回生成任務,生成任務將構成任務的所有輸出收集到 std::vector 中。對於兩個引數的一般情況,PPL 具有一個簡便的表達方法:運算子 &&。
這就是我如何使用聯接運算子重新實現檔案串聯方法:
- task<String^> ConcatFiles(String^ file1, String^ file2)
- {
- auto strings_task = ReadStringTask(file1) && ReadStringTask(file2);
- return strings_task.then([](std::vector<String^> strings) {
- return strings[0] + strings[1];
- });
- }
選項操作也很有用。如果有一系列的任務,選項(通過 when_any 方法實現)在序列中第一個任務完成時完成。像聯接一樣,選項也具有一個雙引數的簡便表達方法,使用運算子 ||。
選項在冗餘或推測執行的情況下比較方便;你啟動多個任務,由要完成的第一個任務提供所需的結果。你還可以對操作新增超時設定 - 啟動一個返回任務的操作,然後將它與休眠指定時間量的任務相組合。如果休眠任務先完成,就表示你的操作超時,因此被放棄或取消。
PPL 具有另一個有助於任務可組合性的構造 (task_completion_event),你可以將它用於任務與非 PPL 程式碼的互動操作。task_completion_event 可以傳遞給執行緒或期望最後設定的 IO 完成回撥。從 task_completion_event 建立的任務在設定 task_completion_event 之後即完成。
使用 PPL 編寫非同步操作
無論何時你需要發揮硬體的最大效能,C++ 語言都是你的明智之選。其他語言在 Windows 8 中發揮各自的作用:JavaScript/HTML5 組合很適合編寫 GUI;C# 提供高效的開發人員體驗;等等。要編寫 Metro 樣式的應用程式,請使用你擅長的方法和你瞭解的方式。實際上,你可以在同一個應用程式中使用多種語言。
你經常會發現,編寫應用程式前端時使用 JavaScript 或 C# 等語言,而編寫後端元件時則使用 C++ 語言,以獲得最大效能。如果 C++ 元件匯出的操作受計算限制或受 I/O 限制,最好將該操作定義為非同步操作。
為實現之前介紹的四種 WinRT 非同步介面(IAsyncOperation、IAsyncAction、IAsyncOperationWithProgress 和 IAsyncActionWithProgress),PPL 在併發名稱空間中同時定義了 create_async 方法和 progress_reporter 類。
在最簡單的形式中,create_async 採用返回值的 lambda 或函式指標。lambda 的型別決定從 create_async 返回的介面的型別。
如果某個無引數 lambda 返回非 void 型別 T,則 create_async 返回 IAsyncOperation<T> 的實現。對於返回 void 的 lambda,生成介面為 IAsyncAction。
lambda 可以採用 progress_reporter<P> 型別的引數。該型別的例項用於將型別 P 的進度報告發布回撥用方。例如,採用 progress_reporter<int> 的 lambda 可以使用整數值報告完成百分比。這種情況下,lambda 的返回型別決定生成介面是 IAsyncOperationWithProgress<T,P> 還是 IAsyncAction<P>。參見圖 5。
圖 5 在 PPL 中編寫非同步操作
- IAsyncOperation<float>^ operation = create_async([]() {
- return 42.0f;
- });
- IAsyncAction^ action = create_async([]() {
- // Do something, return nothing
- });
- IAsyncOperationWithProgress<float,int>^ operation_with_progress =
- create_async([](progress_reporter<int> reporter) {
- for(int percent=0; percent<100; percent++) {
- reporter.report(percent);
- }
- return 42.0f;
- });
- IAsyncActionWithProgress<int>^ action_with_progress =
- create_async([](progress_reporter<int> reporter) {
- for(int percent=0; percent<100; percent++) {
- reporter.report(percent);
- }
- });
要向其他 WinRT 語言公開非同步操作,請在你的 C++ 元件中定義一個公共 ref 類,並定義一個返回四個非同步介面之一的函式。你可以在 PPL 示例包中找到有關混合 C++/JavaScript 應用程式的具體示例(要獲得該示例包,請聯機搜尋“Asynchrony with PPL”)。以下程式碼段以帶進度的非同步操作公開影象轉換例程:
- public ref class ImageTransformer sealed
- {
- public:
- //
- // Expose image transformation as an asynchronous action with progress
- //
- IAsyncActionWithProgress<int>^ GetTransformImageAsync(String^ inFile, String^ outFile);
- }
如圖 6 所示,應用程式的客戶端部分在 JavaScript 中使用 promise 物件實現。
圖 6 在 JavaScript 中使用影象轉換例程
- var transformer = new ImageCartoonizerBackend.ImageTransformer();
- ...
- transformer.getTransformImageAsync(copiedFile.path, dstImgPath).then(
- function () {
- // Handle completion…
- },
- function (error) {
- // Handle error…
- },
- function (progressPercent) {
- // Handle progress:
- UpdateProgress(progressPercent);
- }
- );
錯誤處理和取消
留心的讀者可能已經注意到,這種非同步處理到目前為止幾乎完全不涉及任何錯誤處理和取消。下面就立即開始討論這個主題!
檔案讀取例程總會不可避免地遇到不存在的檔案或因眾多原因而無法開啟的檔案。字典查詢功能將遇到不認識的字詞。影象轉換無法儘快生成結果,而被使用者取消。在這些場景中,操作在執行完預期的工作之前已經永遠終止。
在現代的 C++ 中,異常用於指示錯誤或其他異常條件。異常在單執行緒中執行非常好:當引發異常時,堆疊隨即展開,一直展開到呼叫堆疊下的適當 catch 塊。加入併發後,事情就變得雜亂了,因為從一個執行緒生成的異常不容易被另一個執行緒捕獲。
考慮任務和延續任務發生了什麼:當任務的主體引發了異常時,其執行流即被中斷,並且無法生成值。如果沒有值可以傳遞給延續任務,則延續任務不會執行。即使是不生成值的 void 任務,你也需要能夠告訴它之前的任務是否已成功完成。
這就是為什麼存在延續任務的另一種形式:對於型別 T 的任務,錯誤處理延續任務的 lambda 採用 task<T>。要獲得之前任務生成的值,必須對引數任務呼叫 get 方法。如果之前的任務已成功完成,則 get 也成功完成。否則,get 方法將引發異常。
在此我想要強調一個重點。對於 PPL 中的所有任務,包括從非同步操作建立的任務,對其呼叫 get 函式在語法上是有效的。然而,在結果可用之前,get 方法必須阻止呼叫執行緒,當然,這與我們“快而流暢”的口號是矛盾的。因此,一般不鼓勵對任務呼叫 get 方法,並且在 STA 中禁止呼叫該方法(執行時將引發“無效操作”異常)。僅當你將任務作為延續任務的引數,才能呼叫 get。圖 7 顯示了一個示例。
圖 7 錯誤處理延續任務
- task<image> take_picture([]() {
- if (!init_camera())
- throw std::exception("can’t init camera");
- return get_image();
- });
- take_picture.then([](task<image> antecedent) {
- try
- {
- image img = antecedent.get();
- }
- catch (std::exception ex)
- {
- // Handle exception here
- }
- });
- var transformer = new ImageCartoonizerBackend.ImageTransformer();
- ...
- transformer.getTransformImageAsync(copiedFile.path, dstImgPath).then(
- function () {
- // Handle completion…
- },
- function (error) {
- // Handle error…
- },
- function (progressPercent) {
- // Handle progress:
- UpdateProgress(progressPercent);
- }
- );
你程式中的每個延續任務都可能是錯誤處理延續任務,你可以選擇處理所有延續任務中的異常。然而,在由多個任務組成的程式中,處理所有延續任務中的異常可能會造成過度負載。幸運的是,這種情況不一定發生。與未處理的異常相似,沿著呼叫堆疊向下處理,直到找到捕獲它們的框架,由任務引發的異常可以“慢慢流向”鏈中的下一個延續任務(直到到達最後處理它們的位置)。並且必須對他們進行處理,如果某個異常保持未處理狀態超過了任務本可以對它完成處理的生存期,則執行時將引發“未觀察到的異常”異常。
現在讓我們回到檔案讀取示例,並針對它討論錯誤處理。由 WinRT 引發的所有異常都屬於型別 Platform::Exception,因此這也是我要在最後的延續任務中捕獲的內容,如圖 8 所示。
圖 8 使用錯誤處理從檔案讀取字串
- task<String^> ReadStringTaskWithErrorHandling(String^ fileName)
- {
- StorageFolder^ item = KnownFolders::PicturesLibrary;
- auto holder = ref new Holder();
- task<StorageFile^> getFileTask(item->GetFileAsync(fileName));
- return getFileTask.then([](StorageFile^ storageFile) {
- return storageFile->OpenAsync(FileAccessMode::Read);
- }).then([holder](IRandomAccessStream^ istream) {
- holder->Reader = ref new DataReader(istream);
- return holder->Reader->LoadAsync(istream->Size);
- }).then([holder](task<UINT> bytesReadTask) {
- try
- {
- UINT bytesRead = bytesReadTask.get();
- return holder->Reader->ReadString(bytesRead);
- }
- catch (Exception^ ex)
- {
- String^ result = ""; // return empty string
- return result;
- }
- });
- }
延續任務捕獲到異常後,將視異常為“已處理”,而延續任務則返回成功完成的任務。所以,在圖 8 中,ReadStringWithErrorHandling 的呼叫方將無法得知檔案讀取是否已成功完成。我在這裡要說的是太早處理異常並不總是好事。
取消是過早終止任務的另一種形式。與 PPL 一樣,在 WinRT 中進行取消需要雙方的協作,即操作的客戶端和操作本身。它們的作用不同:客戶端請求取消,而操作確認或拒絕請求。由於客戶端和操作之間的自然競爭,因此取消請求並不保證一定成功。
在 PPL 中,這兩種作用分別由兩個型別表示:cancellation_token_source 和 cancellation_token。前一個型別的例項用於通過呼叫 cancel 方法來請求取消。後一個型別的例項則從 cancellation_token_source 進行例項化,並作為最後一個引數傳遞給任務的建構函式(then 方法)或 create_async 方法的 lambda。
在任務的主體內部,實現可以通過呼叫 is_task_cancellation_requested 方法輪詢取消請求,並通過呼叫 cancel_current_task 方法確認請求。由於 cancel_current_task 方法在封面下引發異常,因此可以在呼叫 cancel_current_task 之前進行一些資源清理。圖 9 顯示了一個示例。
圖 9 任務中取消以及對取消請求的反應
- cancellation_token_source ct;
- task<int> my_task([]() {
- // Do some work
- // Check if cancellation has been requested
- if(is_task_cancellation_requested())
- {
- // Clean up resources:
- // ...
- // Cancel task:
- cancel_current_task();
- }
- // Do some more work
- return 1;
- }, ct.get_token());
- ...
- ct.cancel(); // attempt to cancel
請注意,許多工都可以通過相同的 cancellation_token_source 取消。這對於處理任務鏈和任務圖形時非常方便。你可以取消指定的 cancellation_token_source 管理的所有任務,而無需單獨地取消每一個任務。當然,不保證所有任務都能實際響應取消請求。此類任務將完成,但是它們正常(基於值)的延續任務不會執行。錯誤處理延續任務將執行,但在嘗試從之前任務獲取值時將引發 task_canceled 異常。
最後,讓我們看一下對生產方使用取消令牌。create_async 方法的 lambda 可以採用 cancellation_token 引數,使用 is_canceled 方法對該引數進行輪詢,並在響應取消請求時取消該操作:
- IAsyncAction^ action = create_async( [](cancellation_token ct) {
- while (!ct.is_canceled()); // spin until canceled
- cancel_current_task();
- });
- ...
- action->Cancel();
請注意,在任務延續的情況下,由 then 方法接收取消令牌,而對於 create_async,取消令牌則傳遞到 lambda。在後一種情況下,通過對生成的非同步介面呼叫 cancel 方法啟動取消,然後由 PPL 通過取消令牌直接將它插入取消請求。
總結
如同 Tony Hoare 曾經嘲笑的一樣,我們需要教育我們的程式“等待快一點”。然而,不等待的非同步程式設計仍然很難掌控,並且其優勢也不是非常明顯,因此開發人員不使用它。
在 Windows 8 中,所有阻止操作都是非同步的。如果你是一名 C++ 程式設計師,PPL 可以使非同步程式設計非常愉快。擁抱非同步世界吧,告訴你的程式等待再快一點!
趕緊下載VS11體驗吧
http://www.microsoft.com/click/services/Redirect2.ashx?CR_CC=200098144