1. 程式人生 > >使用併發與協調執行時

使用併發與協調執行時

介紹

併發與協調執行時(Concurrency and Coordination Runtime,CCR)是一個.NET平臺上的非同步資訊傳遞類庫,提供了一套細小而強大的基礎功能,能夠使用不同的方式來組織應用程式。應用程式通過有效使用CCR可以獲得更好的響應能力,以及更好的伸縮性及容錯性。而它最神奇的地方則在於,開發人員獲得這些便利的同時,還減少(甚至完全消除)了對執行緒、鎖、互斥體(mutex)或其他同步元素的直接操作(或捕獲錯誤)。

如果您的應用程式是單執行緒的,CCR可以使您的程式提高響應能力,並且更充分利用CPU核心——同時讓您的程式碼庫從概念上保持整潔。如果您的程式已經啟用了多執行緒,那麼CCR能夠在簡化您程式碼庫的同時,保持(甚至改進)程式的吞吐量。

簡單說來,CCR提供瞭如下功能:

  • 一個簡單而高效能的訊息傳遞實現。使用非常輕量級且型別安全的通道,以面向Action的視角把各種物件連線了起來。
  • 一套基礎的排程約束。排程幾乎就是CCR的全部。您可以建立各種任務,把訊息傳遞給程序中的其他元件,並且通過一些叫做仲裁器(Arbiter)的物件來宣告一些約束,以此對請求進行處理。在執行您的程式碼之前,CCR能夠保證這些約束已經得到滿足。
  • 為失敗處理提供了一種更好的模型。CCR提供了“Causality”,意味著為一系列相關非同步子任務提供一個上下文,這樣某個任務失敗時(例如丟擲了異常),那麼這樣的失敗能在單獨的地方進行隔離處理,這與發起任務的執行緒並沒有關係。
  • 更好的使用已有的(或將來會有的)計算能力。CCR能夠使用現有的執行緒池進行排程,如果您需要的話,也可以使用它自帶的實現,這在某些情況下能夠帶來更好的效能。自然,這個機制只會對您的程式碼產生細微的影響。
  • 使非同步IO操作得以簡化。提高獨立程序的伸縮性與效能的關鍵,最終往往歸結於高效的I/O密集型操作。I/O密集型操作往往會比計算密集型操作要慢許多倍,而一個阻塞的I/O操作會浪費有效的資源(在這裡就是執行緒),使它們無法處理其他等待的任務。使用非同步的I/O操作能夠釋放這些資源,讓它們去處理其他任務,直到I/O操作完成。然而,一系列的非同步操作需要將其“開始”及“完成”分開,這使編碼難度變得很大。CCR使用特別的C#迭代器實現,使開發人員能夠輕鬆控制此類操作。

通過“非同步訊息傳遞”機制,我們的元件通過傳送資料的方式與另一個元件進行通訊,更值得一提的是,資料與後續回覆沒有確定的臨時關係。一個已傳送的訊息可能需要在未來某個時候才會得到處理,也只有到那個時候訊息才會得到回覆。

這種非同步訊息傳遞模型是大部分情況下跨程序(inter-process)計算的基礎,不過與之相比,在現實應用中使用CCR來進行純粹的程序內部(intra-process)通訊往往可以得到更好的保證,不像前者在很多情況下都可能失敗。因此,CCR不僅可以用於低階I/O操作,對於構建伸縮性強的分散式系統也可以提供很好的輔助。

CCR的基礎型別

CCR由幾種基礎型別組成:

  1. 任務(Task):任意一段用於執行的程式碼。
  2. 任務佇列(TaskQueue):確切地說,是“分發佇列(DispatcherQueue)”。一個待執行任務的列表。一個CLR執行緒池或CCR執行緒池(即Dispatcher)的執行緒會從佇列中取出任務並加以執行。
  3. 埠(Port):程序內部連線各元件的訊息佇列。簡單來說這只是個連結串列(linked-list),生成者在埠內放置訊息。CCR提供了泛型的過載,可以提供型別安全的操作。
  4. 仲裁器(Arbiters):CCR的基礎元素,它們將埠和任務連線起來。它們定義了一些約束,用於在訊息到達埠後確定需要建立的任務,以及使用哪個任務佇列進行接受。CCR中內建了多種仲裁器,其中大部分可以進行組合,以此獲得更靈活的功能。

瞭解了這些基本概念之後,我們來看一些簡單的CCR程式碼。首先,我們來定義一個簡單的C#控制檯應用程式,它會用來執行所有的示例。注意在這個程式中,我們使用了CCR自定義的執行緒池(Dispatcher)並與我們的任務佇列繫結。這意味著佇列中任務會被自定義執行緒池中的執行緒執行。

static void Main(string[] args)
{
    using (var dr = new Dispatcher())
    {
        using (var taskQueue = new DispatcherQueue("samples", dr))
        {
            // Examples will go here...


// Need a blocking call to prevent the application
// exiting.
Console.ReadLine(); } } }
儘管示例中只使用了一個任務佇列,但是在實際應用中還是建議使用多個佇列。CCR在獲取任務時會使用輪詢策略來訪問多個任務佇列,避免任何一個佇列處於飢餓狀態。

首先,我們直接把一個任務放入佇列。這是CCR中執行一個任務最簡單的方法,我們這裡連埠都沒有用到。Arbiter類包含了一系列簡化開發的方法,例如FromHandler直接從一個委託物件來建立任務——在這裡我們使用匿名方法來構建該物件。這樣任務就被放入了任務佇列,可以由分發器來執行了。

    // Enqueue a task directly
    taskQueue.Enqueue(Arbiter.FromHandler(() =>
        Console.WriteLine("Hello, world.")));

大多數情況下我們不太會如此直接地向佇列中放入任務,一般來說總有一個埠在工作。在下一段程式碼中,我們會定義一個埠,一個仲裁器,然後向端口裡傳送訊息。這個示例中我們使用String強型別的埠,這樣委託的簽名也需要接受一個字串。

// Post a message to a port to schedule a task.
var port = new Port<string>();
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
port.Post("Hello (again), world");

這裡發生了一些不那麼簡單的東西,需要花一點時間才能瞭解整個過程。port.Receive()呼叫建立了一個可能是最簡的仲裁器,或者說是一個“接受者”,一個訊息達到埠時它即會生效。訊息到達之後,便會建立一個任務,這個任務的功能是呼叫委託物件,並使用剛才的訊息作為引數。 Arbiter.Activate()呼叫將建立的任務和特定的任務佇列繫結在一起。

要理解CCR仲裁器,最關鍵的一點是它們永遠不會阻塞執行緒。一旦接收器無法獲得資料時,執行緒就會被釋放,可用於處理其他正在等待的任務。

仲裁器可以在任何時間建立,這是CCR中一個重要的概念。因此,如下所示,即使我們把上面示例中最後兩行程式碼的次序交換,其效果也是一樣的。

// Post a message to a port to schedule a task.
var port = new Port();
port.Post("Hello (again), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));

現在我們來少許修改一下示例——我們向埠中放入兩條訊息再使接收器生效,來看看會發生什麼……

// Post a message to a port to schedule a task.
var port = new Port();
port.Post("Hello (again), world");
port.Post("Hello (thrice), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));

現在如果您執行程式,就會發現只打印了一條訊息。這是因為port.Receive()呼叫是一個擴充套件方法,它簡化了以下語法,但是並不完全相等:

    Arbiter.Activate(
        taskQueue,
        Arbiter.Receive(false, port, Console.WriteLine));

這裡最為關鍵的是傳遞給Arbiter.Receive()呼叫的第一個(Boolean)引數。它表明這個接收器是臨時的,處理完一條訊息後就會拋棄。如果我們希望處理所有達到該埠的訊息,我們可以將這個引數設為true。

    // Post a message to a port to schedule a task.
    var port = new Port();
	port.Post("Hello (again), world");
	port.Post("Hello (thrice), world");
    Arbiter.Activate(
        taskQueue,
        Arbiter.Receive(true, port, Console.WriteLine));

上面的程式碼有時候會打印出奇怪的結果——兩行內容的順序不一致了。這究竟是怎麼回事呢?

在CCR中,一旦某個仲裁器(這裡是個“接受”操作)被滿足之後,它會建立一個任務來處理相關訊息。除非這個仲裁器被巢狀在另一個更大的組合內,這些任務都會被放入任務佇列中等待執行。在我們上面的示例中,這個持久地接受器會立即分別為兩條訊息產生一個任務。當存在可用執行緒的時候,這兩個任務將併發被處理,因此並不保證兩者的順序。

CCR執行緒池的實現與CLR執行緒池有幾點不同。最重要的一點是它包含固定數量的執行緒,這在建立時便確定下來。如果執行緒執行的操作不會阻塞,那麼就不會有什麼問題。但是如果您必須發起阻塞的請求,那還是使用CLR執行緒池對任務佇列進行排程為好,因為它能夠動態的增長和收縮。這樣的任務佇列可以使用 DispatcherQueue預設建構函式來建立。

有幾種辦法可以保證訊息的順序。可能最簡單的方法就是在迴圈中使用一個臨時接收器,這樣就能一次只處理一條訊息。幸運的是,CCR包含了一個叫做迭代式任務(Iterative Task)的強大的機制,可以讓我們較為自然的實現這個要求。這需要使用C#迭代器功能,我們來看一個示例:

首先,我們將目前的Arbiter.Activate替換成以下呼叫:

Arbiter.Activate(
	taskQueue,
	new Arbiter<Arbiter<string>>(port, ProcessMessages));

這段程式碼建立了一個名為ProcessMessages的迭代式任務,定義如下:

static IEnumerator<ITask> ProcessMessages(Port<string> port)
{
        while (true)
		yield return port.Receive(Console.WriteLine);
}

這個方法為一個無限迴圈,等待(但不阻塞)接受操作以獲得滿足。接受到訊息時委託將被呼叫,並繼續迴圈。如果我們希望在埠接受到一個空字串時跳出迴圈,我們可以編寫如下程式碼(請注意我們使用了Lambda表示式構建了一個匿名委託來處理訊息):

    static IEnumerator> ProcessMessages(Port port)
{
bool fDone = false;
while (!fDone)
{
yield return port.Receive(message =>
{
if (String.IsNullOrEmpty(message))
fDone = true;
else
Console.WriteLine(message);
});
}
Console.WriteLine("Finished");
}

迭代器是CCR工具箱中非常強大的工具——它大大簡化了順序呼叫的非同步操作的編碼工作,使非阻塞操作的使用非常接近於同步呼叫方式。例如,一個級別高的任務可以返回如下的ProcessMessages():

static IEnumerator<ITask> TopLevelTask(Port<static> port)
{
   // Yield to the nested task
   yield return new IterativeTask<string>>(port, ProcessMessages);

   Console.WriteLine("Finished nested task.");
}

到目前為止,我們只看到了簡單接受器的使用——當單個訊息到達單個埠時仲裁器便會安排一個任務。那麼現在就可以來看一下更高階的仲裁方式了——其提供了一種黏著劑,可以將接受器進行巢狀以建立更強大的功能。

“選擇器(Choice)”是其中最常用的功能之一,它會從多個接受器選擇一個,並且只選擇其中一個接受器來處理。例如,以下迭代任務會在處理前等待一個字串或一個訊號。

static IEnumerator<ITask> ProcessChoice(PortSet<string, EmptyValue> portSet)
{
     bool fDone = false;
     while (!fDone)
     {
         yield return portSet.Choice(
             message => Console.WriteLine(message),
             signal => fDone = true);
     }
     Console.WriteLine("Finished");
}

選擇器一般用於確定一個非同步操作的成功或者失敗,不過您也可以用它來選擇一個或任意數量的選擇器。

埠集(PortSet)是對一個或多個獨立埠的包裝,使它們能作為一個整體來接受訊息。一個典型的示例便是CCR中的SuccessFailurePort,它繼承了PortSet。

另一個常用的仲裁器是級聯(Join)。它會在兩個內嵌的接受器都得到滿足的情況下被啟用。下面的示例便演示了這種方式:

var port1 = new Port<int>();
var port2 = new Port<int>();
port1.Post(1);
port2.Post(3);
Arbiter.Activate(
    taskQueue,
    Arbiter.JoinedReceive(false, port1, port2, (x, y) =>
    {
        Console.WriteLine(x + y);
    }));

在資源有限的情況下,級聯可以非常有效地控制訪問。第一個埠包含了對資源的請求,另一個則包含了有效的資源。使用級聯之後,我們可以限制請求只在有空閒資源的時候才進行處理。另一個高級別的仲裁方式是“交織(Interleave)”。這在概念上與讀寫鎖(read-write lock)較為接近,只能在非阻塞的非同步語法中使用。讀取任務能夠與其他讀取任務同時執行,但是寫入任務(它比讀取的優先順序高)只能在沒有其他任務執行的情況下進行。以下是這種仲裁器的宣告,它用於保護某種概念上的“快取”:

var updatePort = new Port<UpdateCache>();
var queryPort = new Port<QueryCache>();
var stopPort = new Port<Shutdown>();


var interleave = new Interleave(
   new TeardownReceiverGroup(
         Arbiter.Receive(false, stopPort, ClearDownCache)),
   new ExclusiveReceiverGroup(
         Arbiter.Receive(true, updatePort, OnUpdateCache)),
   new ConcurrentReceiverGroup(
         Arbiter.Receive(true, queryPort, OnQueryCache)));
Arbiter.Activate(taskQueue, interleave);

在這裡,持久接受器被放入合適的組中,這便開始了“交織”仲裁。任何屬於ConcurrentReceiverGroup()的接受器能夠讓關聯的任務之間併發執行。相反,ExclusiveReceiverGroup中的接受器只能獨立於其他接受器執行。此外對於放入該組的接受器,我們可以限制它們完全按照訊息傳遞的順序來執行任務。任何屬於TeardownReceiverGroup組中的接受器會在關閉“交織”仲裁時,也就是在最後被呼叫——因此這樣的接受器不能是持久化的。

“交織”仲裁使用輪詢的方式對各接受器進行較為公平的排程。此外關於執行的順序,即使在ExclusiveReceiverGroup內部也是各埠獨立的。對於傳送至相互無關的埠的兩條訊息,它們的執行順序並不確保與它們的到達順序相同。

之前提到過,CCR迭代任務能讓我們用接近於普通阻塞式同步操作的方式,來編寫邏輯上是順序執行的非阻塞非同步操作。這樣的非同步操作一般為I/O密集型操作,可能是一個Web請求,一個數據庫操作,或基礎檔案I/O等。由於現在我們可以更好地控制這些操作,現在編寫這種非同步I/O操作變得愈發簡單,並且能有效地提高應用程式的吞吐量和伸縮性。

在APM世界中連線BeginXXX和EndXXX的重要模式為AsyncCallback委託,它的形式是:

    public delegate void AsyncCallback(IAsyncResult ar);

CCR便基於此,Port的Post操作與之正好吻合。使用這種這種方式,我們可以用如下的迭代任務進行非同步檔案複製:

static IEnumerator<ITask> Copy(FileStream source, FileStream target)
    {
        var buffer = new byte byte[128 * 1024];
        var port = new Port>();
        var bytesRead = 0;


        do
        {
            source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
            yield return Arbiter.Receive(
                false, port, iar => bytesRead = source.EndRead(iar));
            target.BeginWrite(buffer, 0, bytesRead, port.Post, null);
            yield return Arbiter.Receive(
                false, port, iar => target.EndWrite(iar));
        } while (bytesRead > 0);
    }

從根本上說,這些非同步操作在完成時都會向我們的埠中傳遞一個IAsyncResult物件。在這裡,我們會等待埠的接受操作得到滿足,這意味著非同步操作已經完成,可以接著下一步繼續開始。這完全是一個非同步實現(我們完全可能使用少量的執行緒來執行數千個這樣的操作),但是我們這些程式碼的意圖可謂一目瞭然 ——我們讀取一塊資料,再寫入一塊資料,反覆不斷直到完成。

為了使檔案複製的示例儘可能保持簡單,上面的程式碼省略了處理異常的程式碼,不過強壯的程式碼必須能夠處理讀寫操作可能引發的異常。在這裡無法運用普通的 try/catch,因為這種異常處理方式具有執行緒執行緒相關性,而CCR任務可能被運行於任何可用的執行緒之上——事實上對於這種迭代任務,任務的每個“步驟”都可能與之前在不同的執行緒上執行。

在CCR中處理異常有兩種基本的方式。第一種是顯式地在每個操作中將錯誤進行捕獲,並由埠傳播出去。然而這會讓呼叫者和被呼叫者雙方造成可觀的程式碼膨脹。如果要在前例中進行顯式地錯誤處理,檔案訪問方式就會發生根本性的改變。下面的程式碼展示了檔案讀取方面需要進行的改變:

       static IEnumerator<ITask> Copy(
           FileStream source, FileStream target,
           SuccessFailurePort resultPort)
       {
           var buffer = new byte [128 * 1024];
           var port = new Port<IAsyncResult>();
           var bytesRead = 0;

           do
           {
               // Deal with a failure on the BeginRead
               try
               {
                   source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
               }
               catch (Exception e)
               {
                   resultPort.Post(e);
                   yield break;
               }

               // Deal with a failure on the EndRead
               yield return Arbiter .Receive(
                   false, port, iar =>
                   {
                       try
                       {
                           bytesRead = source.EndRead(iar);
                       }
                       catch (Exception  e)
                       {
                           resultPort.Post(e);
                       }
                   });

               // Stop processing if there is a failure.
               if (bytesRead == 0)
                   yield break;

               // And so on for the write...
           } while (bytesRead > 0);
           resultPort.Post(new SuccessResult ());
       }

很顯然,這個方式十分笨重,而且非常容易出錯——這不是程式碼中應該使用的做法,我們要把錯誤處理路徑獨立出來。

值得慶幸的是,CCR為錯誤處理提供了一種乾淨而強大的支援,這種機制被稱作為“因果關係(Causality)”。它不僅讓我們的程式碼從顯式處理異常的繁雜中釋放出來,還在聲明後繼續保持原有任務的執行路徑。這使我們能夠建立統一的錯誤處理程式碼來應對任意複雜的非同步操作。

建立Causality的典型方式是在初始化分發器之後為它的異常埠附加一個委託物件,以此來通知CCR處理異常的方式。

    var exceptionPort = new Port<Exception>();
    Arbiter.Activate(taskQueue, exceptionPort.Receive(e =>
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Caught " + e.ToString());
            Console.ResetColor();
        }));
    Dispatcher.AddCausality(new Causality("Example", exceptionPort));

這樣,我們可以用普通方式建立任務或傳遞訊息了,Causality會捕獲任何任務在執行過程中產生的異常,並由Causality交給異常埠,並在相關的委託物件上執行。

結論

CCR使您的應用程式能夠用它提供的這些方式來表現,這包括一些基礎的資料依賴關係,在執行時排程資料以使用有效的CPU核。這些作法直接將您從顯式的執行緒和鎖控制中釋放開來,同時讓您的應用程式能夠完全利用日益強大的多核運算資源。

注意:文章中所有觀點均為作者所有,與其僱主無關。

文章中的所有示例能夠在執行在免費提供的“Microsoft Robotics Developer Studio 2008 Express Edition”中。請仔細閱讀許可協議以獲取更多資訊。

給InfoQ中文站投稿或者參與內容翻譯工作,請郵件至[email protected]。也歡迎大家加入到InfoQ中文站使用者討論組中與我們的編輯和其他讀者朋友交流。