1. 程式人生 > >在.NET Core中使用Channel(一)

在.NET Core中使用Channel(一)

我最近一直在熟悉.net Core中引入的新Channel<T>型別。我想在它第一次釋出的時候我瞭解過它,但是有關文章非常非常少,我不能理解它們與其他佇列有什麼不同。

在使用了一段時間後,我終於看到了它們的吸引力和真正的力量。最值得注意的是大型非同步後臺操作,這些操作幾乎需要雙向通訊來同步它們正在做的事情。這句話有點拗口,但希望在本系列文章結束時,你會清楚什麼時候應該使用Channel<T>,什麼時候應該使用一些更基本的東西,比如Queue<T>。

Channel是什麼?

從核心來說,Channel本質上是.net中的一種新的集合型別,它與現有的Queue<T>型別非常相似,但有額外的好處。在真正嘗試研究這個主題時,我發現的問題是,許多現有的外部佇列技術(IBM MQ、Rabbit MQ等)都有“channel”的概念,它們的範圍從完全抽象的思維過程,到系統中實際的物理型別。

現在也許我大錯特錯,但如果你認為.net中的Channel就好比是允許等待新訊息的一個佇列,並告訴生產者要保持佇列越來越大,消費者無法跟上,我認為這很難出錯。

這裡我提到了一個關鍵詞,生產者/消費者。你可能還聽說過Pub/Sub。但它們是不可互換的。

Pub/Sub描述的是某人釋出資訊,一個或多個“訂閱者”監聽該資訊並對其採取一定的響應行為。這裡不存在負載平衡,因為當新增訂閱伺服器時,它們本質上與其他所有人獲得相同訊息的副本。

在圖表形式中,Pub/Sub看起來有點像這樣:

生產者/消費者描述生產者釋出訊息的行為,並且有一個或多個消費者可以對該訊息進行操作,但是每個訊息只讀取一次。它不會分發到每個訂閱者。

當然,用圖表的形式:

另一種思考生產者/消費者的方式是想象你去超市結賬。當顧客想結帳時,排隊的隊伍變長了,你可以簡單地開啟更多的收銀臺來處理這些顧客。這個小小的思考過程實際上是很重要的,因為如果你不能開啟更多的收銀臺怎麼辦?排隊的隊伍應該越來越長嗎?如果收銀臺操作員坐在那裡,但沒有顧客怎麼辦?他們是應該當天就打包回家呢,還是應該被告知坐著等客人來了再說。

這通常被稱為生產者-消費者問題,這是Channel要解決的問題。

基礎Channel示例

與Channel有關的所有東西都在System.Threading.Channels中。在以後的版本中,這似乎是與標準的.net Core專案捆綁在一起的,但如果不是,這裡有一個nuget包:https://www.nuget.org/packages/System.Threading.Channels。

一個極其簡單的Channel示例是這樣的:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    for (int i = 0; i < 10; i++)
    {
        await myChannel.Writer.WriteAsync(i);
    }

    while (true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

這裡沒有太多可談的。我們建立了一個“無限的”通道(這意味著它可以容納無限項,但在本系列的後續內容中會有更多內容)。我們寫10項,讀10項,在這一點上,它與我們在.net中見過的任何其他佇列沒有太大區別。

Channel是執行緒安全的

沒錯,通道是執行緒安全的。這意味著多個執行緒可以讀寫同一個通道而不會出現問題。如果我們看一下這裡的Channel原始碼,我們可以看到它是執行緒安全的,因為它使用鎖和內部“佇列”的組合來同步讀/寫器,一個接一個地讀/寫。

實際上,Channel的預期用例是多執行緒場景。例如,如果我們使用上面的基本程式碼,當我們實際上不需要執行緒安全性時,維護執行緒安全性實際上會有一些開銷。所以在那個例子中,我們可能只使用Queue<T>更好。但是這段程式碼呢?

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
            await Task.Delay(1000);
        }
    });

    while (true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

在這裡,我們有一個單獨的執行緒寫入訊息,而我們的主執行緒讀取訊息。你會注意到有趣的事情是,我們添加了訊息之間的延遲。怎麼能呼叫ReadAsync()?沒有TryDequeue或Dequeue,如果佇列中沒有訊息,它就執行null,對嗎?

答案是Channel Reader的“ReadAsync()”方法實際上會“等待”一個訊息。因此,不需要在等待訊息時執行一些荒謬的迴圈,也不需要在等待時完全阻塞執行緒。我們將在以後的文章中進一步討論這個問題,但是你要知道你可以使用ReadAsync來等待新的訊息,而不是編寫一些自定義的程式碼來做同樣的事情。

接下來是什麼?

現在你已經掌握了基礎知識,下一篇讓我們看看使用Channel一些更高階的場景。

 歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。

 

原文連結:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-1-getting-started/