1. 程式人生 > >C++ concurrency::task實現非同步程式設計(Windows)

C++ concurrency::task實現非同步程式設計(Windows)

最近一直在看js、python、lua等指令碼語言的非同步程式設計,今天腦子一熱突然想看看C++能否也有支援非同步的相關類庫,上網一搜還真的有

主要使用task class 及其相關型別和函式,它們都包含在 concurrency 名稱空間中且在 <ppltasks.h> 中定義。concurrency::task 類是一個通用型別,但是當使用 /ZW 編譯器開關(對於 Windows 執行時應用和元件而言是必需的)時,任務類會封裝 Windows 執行時非同步型別,以便更容易地完成以下工作:
1.將多個非同步和同步操作結合在一起
2.在任務鏈中處理異常
3.在任務鏈中執行取消
4.確保單個任務在相應的執行緒上下文或單元中執行

簡單的一個小demo:

#include "stdafx.h"
#include <numeric>
#include <ppltasks.h>
#include <sstream>
#include <array>  
#include <iostream>
#include <time.h>  
using namespace Concurrency;
using namespace std;

int main()
{

	array<task<int>, 3> tasks = {
		task<int>([] { Sleep(3000); cout << "the f1 output\n"; Sleep(10000);  return 100; }),
		task<int>([] { Sleep(1000); cout << "the f2 output\n" ; Sleep(10000); return 10; }),
		task<int>([] { Sleep(2000); cout << "the f3 output\n" ; Sleep(10000); return 1; })
	};

	//when_all().wait():容器裡面所有的task都被執行後,才繼續向下執行。
    //when_any().wait():容器裡第一個task完成之後,就繼續向下執行。
	auto TaskList = when_all(tasks.begin(), tasks.end())
		.then([](vector<int> results)
	{
		cout << "The sum is "
			<< accumulate(results.begin(), results.end(), 0)
			<< endl;
	});

	time_t t1 = time(NULL);

	cout << "begin\n";

	TaskList.wait();

	cout << "end\n";

	time_t t2 = time(NULL);

	cout << "the runtime length " << t2 - t1 << "s " << endl;

	system("pause");
}



我們定義了三個task,且每個return前都又10s的sleep來代表io操作,其結果總耗時為最長執行的f1(sleep(3000)+sleep(10000))。

官網文件相關說明:

通過任務使用非同步操作

以下示例說明如何利用任務類來使用返回 IAsyncOperation 介面且其操作會生成一個值的async方法。以下是基本步驟:
呼叫 create_task 方法並將其傳遞到 IAsyncOperation^ 物件。
呼叫任務上的成員函式 task::then 並且提供一個將在非同步操作完成時呼叫的 lambda。

#include <ppltasks.h>
using namespace concurrency;
using namespace Windows::Devices::Enumeration;
...
    void App::TestAsync()
{    
    //Call the *Async method that starts the operation.
    IAsyncOperation<DeviceInformationCollection^>^ deviceOp =
        DeviceInformation::FindAllAsync();

    // Explicit construction. (Not recommended)
    // Pass the IAsyncOperation to a task constructor.
    // task<DeviceInformationCollection^> deviceEnumTask(deviceOp);

    // Recommended:
    auto deviceEnumTask = create_task(deviceOp);

    // Call the task’s .then member function, and provide
    // the lambda to be invoked when the async operation completes.
    deviceEnumTask.then( [this] (DeviceInformationCollection^ devices ) 
    {		
        for(int i = 0; i < devices->Size; i++)
        {
            DeviceInformation^ di = devices->GetAt(i);
            // Do something with di...			
        }		
    }); // end lambda
    // Continue doing work or return...
}

由 task::then 函式建立並返回的任務稱為延續。 使用者提供的 lambda 輸入引數(在此情況下)是任務操作在完成時產生的結果。它與你在直接使用 IAsyncOperation 介面時通過呼叫 IAsyncOperation::GetResults 檢索到的值相同。
task::then 方法立即返回,並且其代理直至非同步工作成功完成後才執行。 在本示例中,如果非同步操作導致引發異常,或由於取消請求而以取消狀態結束,則延續永遠不會執行。 稍後,我們將介紹如何編寫即使上一個任務被取消或失敗也會執行的延續。
儘管你在本地堆疊上宣告任務變數,但它仍然管理其生存期,這樣在其所有操作完成並且對其的所有引用離開作用域之前都不會被刪除(即使該方法在操作完成之前返回)。


建立任務鏈

在非同步程式設計中,常見的做法是定義一個操作序列,也稱作任務鏈,其中每個延續只有在前一個延續完成後才能執行。在某些情況下,上一個(或先行)任務會產生一個被延續接受為輸入的值。通過使用 task::then 方法,你可以按照直觀且簡單的方式建立任務鏈;該方法返回一個 task<T>,其中 T 是 lambda 函式的返回型別。你可以將多個延續組合到一個任務鏈中: myTask.then(…).then(…).then(…);
當延續建立一個新的非同步操作時,任務鏈尤其有用;此類任務稱為非同步任務。以下示例將介紹具有兩個延續的任務鏈。初始任務獲取一個現有檔案的控制代碼,當該操作完成後,第一個延續會啟動一個新的非同步操作來刪除該檔案。當該操作完成後,第二個延續將執行,並且輸出一條確認訊息。

#include <ppltasks.h>
using namespace concurrency;
...
void App::DeleteWithTasks(String^ fileName)
{    
    using namespace Windows::Storage;
    StorageFolder^ localFolder = ApplicationData::Current::LocalFolder;
    auto getFileTask = create_task(localFolder->GetFileAsync(fileName));

    getFileTask.then([](StorageFile^ storageFileSample) ->IAsyncAction^ {       
        return storageFileSample->DeleteAsync();
    }).then([](void) {
        OutputDebugString(L"File deleted.");
    });
}

上一個示例說明了四個要點:
第一個延續將 IAsyncAction^ 物件轉換為 task<void> 並返回 task。
第二個延續執行無錯誤處理,因此接受 void 而非 task<void> 作為輸入。 它是一個基於值的延續。
第二個延續直至 DeleteAsync 操作完成後才執行。
因為第二個延續是基於值的,所以如果通過呼叫 DeleteAsync 啟動的操作引發異常,則第二個延續根本不會執行。

注意  建立任務鏈只是使用 task 類組合非同步操作的一種方式。還可以通過使用連線和選擇運算子 && 和 || 來組合操作。有關更多資訊,請參閱任務並行度(併發執行時)。

Lambda 函式返回型別和任務返回型別

在任務延續中,lambda 函式的返回型別包含在 task 物件中。如果該 lambda 返回 double,則延續任務的型別為 task<double>。 但是,任務物件的設計目的是為了不生成無需巢狀的返回型別。如果 lambda 返回 IAsyncOperation<SyndicationFeed^>^,則延續返回 task<SyndicationFeed^>,而不是 task<task<SyndicationFeed^>> 或 task<IAsyncOperation<SyndicationFeed^>^>^。此過程稱為非同步解包,並且它還確保延續內部的非同步操作在呼叫下一個延續之前完成。
請注意,在上一個示例中,即使其 lambda 返回 IAsyncInfo 物件,該任務也仍然會返回 task<void>。下表總結了在 lambda 函式和封閉任務之間發生的型別轉換:


取消任務


為使用者提供取消非同步操作的選項通常是一個不錯的方法。另外,在某些情況下,你可能必須以程式設計方式從任務鏈外部取消操作。儘管每個 *Async 返回型別都具有一個從 IAsyncInfo 繼承的 Cancel 方法,但將其公開給外部方法是不可取的。在任務鏈中支援取消的首選方式是使用 cancellation_token_source 建立 cancellation_token,然後將該令牌傳遞給初始任務的建構函式。如果使用取消令牌建立非同步任務,並且呼叫 cancellation_token_source::cancel,則該任務會自動對 IAsync* 操作呼叫 Cancel,並且將取消請求沿著其延續鏈向下傳遞。下面的虛擬碼演示了基本方法。

//Class member:
cancellation_token_source m_fileTaskTokenSource;

// Cancel button event handler:
m_fileTaskTokenSource.cancel();

// task chain
auto getFileTask2 = create_task(documentsFolder->GetFileAsync(fileName), 
                                m_fileTaskTokenSource.get_token());
//getFileTask2.then ...

取消任務後,task_canceled 異常將沿著任務鏈向下傳播。基於值的延續將不執行,但是基於任務的延續將在呼叫 task::get 時導致引發異常。如果存在錯誤處理延續,請確保它明確捕獲 task_canceled 異常。(此異常不是派生自 Platform::Exception。)
取消是一種協作式操作。如果你的延續要執行一些長時間的工作,而不只是呼叫 Windows 執行時方法,則你需要負責定期檢查取消令牌的狀態,並且在其被取消後停止執行。在清理延續中分配的所有資源之後,請呼叫 cancel_current_task 以取消該任務並將取消向下傳播到後跟的任何基於值的延續。下面是另外一個示例:你可以建立一個任務連結串列示 FileSavePicker 操作的結果。如果使用者選擇“取消”按鈕,則不會呼叫 IAsyncInfo::Cancel 方法。相反,操作成功,但返回 nullptr。 延續可以測試輸入引數,如果輸入是 nullptr,則呼叫 cancel_current_task。
有關更多資訊,請參閱 PPL 中的取消


在任務鏈中處理錯誤

如果希望讓一個延續即使在先行被取消或引發異常的情況下也能夠執行,請將該延續的 lambda 函式的輸入指定為 task<TResult> 或 task<void>,使該延續成為基於任務的延續,但前提是先行任務的 lambda 返回 IAsyncAction^。
若要在任務鏈中處理錯誤和取消,則無需使每個延續成為基於任務的延續,也不用將每個可能引發異常的操作封裝在 try…catch 塊中。相反,你可以將基於任務的延續新增至任務鏈的末尾,並且在那裡處理所有錯誤。任何異常(包括 task_canceled 異常)都將沿著任務鏈向下傳播並繞過所有基於值的延續,因此你可以在基於任務的錯誤處理延續中處理這些異常。我們可以重寫上一個示例以使用基於任務的錯誤處理延續:

#include <ppltasks.h>
void App::DeleteWithTasksHandleErrors(String^ fileName)
{    
    using namespace Windows::Storage;
    using namespace concurrency;

    StorageFolder^ documentsFolder = KnownFolders::DocumentsLibrary;
    auto getFileTask = create_task(documentsFolder->GetFileAsync(fileName));

    getFileTask.then([](StorageFile^ storageFileSample)
    {       
        return storageFileSample->DeleteAsync();
    })

    .then([](task<void> t) 
    {

        try
        {
            t.get();
            // .get() didn't throw, so we succeeded.
            OutputDebugString(L"File deleted.");
        }
        catch (Platform::COMException^ e)
        {
            //Example output: The system cannot find the specified file.
            OutputDebugString(e->Message->Data());
        }

    });
}

在基於任務的延續中,我們呼叫成員函式 task::get 以獲取任務結果。即使該操作是不產生任何結果的 IAsyncAction,我們仍然需要呼叫 task::get,因為 task::get 也會獲取已經傳輸到該任務的所有異常。如果輸入任務正在儲存某個異常,則該異常將在呼叫 task::get 時被引發。 如果不呼叫 task::get,或者不在任務鏈的末尾使用基於任務的延續,或者不捕獲所引發的異常型別,則當對該任務的所有引用都被刪除後,會引發 unobserved_task_exception。
請只捕獲你可以處理的異常。如果你的應用遇到無法恢復的錯誤,則最好讓該應用崩潰,而不要讓其繼續在未知狀態下執行。另外,一般情況下,不要嘗試捕獲 unobserved_task_exception 本身。 該異常主要用於診斷目的。當引發 unobserved_task_exception 時,通常表示程式碼中存在錯誤。 原因通常是應該處理的異常,或由程式碼中的某個其他錯誤導致的不可恢復的異常。


管理執行緒上下文

Windows 執行時應用的 UI 在單執行緒單元 (STA) 中執行。其 lambda 返回 IAsyncAction 或 IAsyncOperation 的任務具有單元意識。如果該任務是在 STA 中建立的,則預設情況下,除非你另外指定,否則該任務的所有要執行的延續也將在該 STA 中執行。 換句話說,整個任務鏈從父任務繼承單元意識。該行為可幫助簡化與 UI 控制元件的互動,這些 UI 控制元件只能從 STA 訪問。
例如,在 Windows 執行時應用中,在任何表示 XAML 頁面的類的成員函式中,你可以從 task::then 方法內部填充 ListBox 控制元件,而無需使用 Dispatcher 物件。

#include <ppltasks.h>
void App::SetFeedText()
{    
    using namespace Windows::Web::Syndication;
    using namespace concurrency;
    String^ url = "http://windowsteamblog.com/windows_phone/b/wmdev/atom.aspx";
    SyndicationClient^ client = ref new SyndicationClient();
    auto feedOp = client->RetrieveFeedAsync(ref new Uri(url));

    create_task(feedOp).then([this]  (SyndicationFeed^ feed) 
    {
        m_TextBlock1->Text = feed->Title->Text;
    });
}

如果任務不返回 IAsyncAction 或 IAsyncOperation,則它不具有單元意識,並且預設情況下,其延續在第一個可用的後臺執行緒上執行。
你可以使用 task_continuation_context 的接受 task::then 的過載來覆蓋任一種任務的預設執行緒上下文。例如,在某些情況下,在後臺執行緒上計劃具有單元意識的任務的延續或許是可取的。在此情況下,你可以傳遞 task_continuation_context::use_arbitrary 以便在多執行緒單元中的下一個可用執行緒上計劃該任務的工作。這可以改善延續的效能,因為其工作不必與 UI 執行緒上發生的其他工作同步。
以下示例將演示何時指定 task_continuation_context::use_arbitrary 選項是有用的,還說明了預設延續上下文在同步非執行緒安全集合上的併發操作方面是多麼有用。在此段程式碼中,我們遍歷 RSS 源的 URL 列表,併為每個 URL 啟動一個非同步操作以檢索源資料。 我們無法控制檢索訂閱的順序,而我們其實並不關心。當每個 RetrieveFeedAsync 操作完成後,第一個延續接受 SyndicationFeed^ 物件並使用它來初始化應用定義的 FeedData^ 物件。因為上述每個操作都獨立於其他操作,所以我們可以通過指定 task_continuation_context::use_arbitrary 延續上下文來提高執行速度。 但是,在初始化每個 FeedData 物件之後,我們必須將其新增至一個不是執行緒安全集合的 Vector 中。因此,我們要建立一個延續並且指定 task_continuation_context::use_current 以確保所有對 Append 的呼叫都發生在同樣的應用程式單執行緒單元 (ASTA) 上下文中。由於 task_continuation_context::use_default 是預設上下文,因此我們無需進行明確指定,但是此處為了清楚起見而予以指定。

#include <ppltasks.h>
void App::InitDataSource(Vector<Object^>^ feedList, vector<wstring> urls)
{
				using namespace concurrency;
    SyndicationClient^ client = ref new SyndicationClient();

    std::for_each(std::begin(urls), std::end(urls), [=,this] (std::wstring url)
    {
        // Create the async operation. feedOp is an 
        // IAsyncOperationWithProgress<SyndicationFeed^, RetrievalProgress>^
        // but we don’t handle progress in this example.

        auto feedUri = ref new Uri(ref new String(url.c_str()));
        auto feedOp = client->RetrieveFeedAsync(feedUri);

        // Create the task object and pass it the async operation.
        // SyndicationFeed^ is the type of the return value
        // that the feedOp operation will eventually produce.

        // Then, initialize a FeedData object by using the feed info. Each
        // operation is independent and does not have to happen on the
        // UI thread. Therefore, we specify use_arbitrary.
        create_task(feedOp).then([this]  (SyndicationFeed^ feed) -> FeedData^
        {
            return GetFeedData(feed);
        }, task_continuation_context::use_arbitrary())

        // Append the initialized FeedData object to the list
        // that is the data source for the items collection.
        // This all has to happen on the same thread.
        // By using the use_default context, we can append 
        // safely to the Vector without taking an explicit lock.
        .then([feedList] (FeedData^ fd)
        {
            feedList->Append(fd);
            OutputDebugString(fd->Title->Data());
        }, task_continuation_context::use_default())

        // The last continuation serves as an error handler. The
        // call to get() will surface any exceptions that were raised
        // at any point in the task chain.
        .then( [this] (task<void> t)
        {
            try
            {
                t.get();
            }
            catch(Platform::InvalidArgumentException^ e)
            {
                //TODO handle error.
                OutputDebugString(e->Message->Data());
            }
        }); //end task chain

    }); //end std::for_each
}


巢狀任務(即在延續內部建立的新任務)不繼承初始任務的單元意識。


處理進度更新

在操作完成之前,支援 IAsyncOperationWithProgress 或 IAsyncActionWithProgress 的方法會在操作執行過程中定期提供進度更新。進度報告獨立於任務和延續概念。 你只需為物件的 Progress 屬性提供委派。委派的一個典型用途是更新 UI 中的進度欄。