.Net執行緒安全集合
執行緒安全集合
.NET Framework 4 引入了 System.Collections.Concurrent 名稱空間,其中包含多個執行緒安全且可縮放的集合類。 多個執行緒可以安全高效地從這些集合新增或刪除項,而無需在使用者程式碼中進行其他同步。 編寫新程式碼時,只要將多個執行緒同時寫入到集合時,就使用併發集合類。 如果僅從共享集合進行讀取,則可使用 System.Collections.Generic 名稱空間中的類。 建議不要使用 1.0 集合類,除非需要定位 .NET Framework 1.1 或更低版本執行時。
.NET Framework 1.0 和 2.0 集合中的執行緒同步
.NET Framework 1.0 中引入的集合位於 System.Collections 名稱空間中。 這些集合(包括常用的 ArrayList 和 Hashtable)通過 Synchronized
屬性(此屬性圍繞集合返回執行緒安全的包裝器)提供一些執行緒安全性。 該包裝器通過對每個新增或刪除操作鎖定整個集合進行工作。 因此,每個嘗試訪問集合的執行緒必須等待,直到輪到它獲取鎖定。 這不可縮放,並且可能導致大型集合的效能顯著下降。 此外,這一設計並不能完全防止爭用情況的出現。 有關詳細資訊,請參閱泛型集合中的同步。
.NET Framework 2.0 中引入的集合類位於
建議使用 .NET Framework 4 中的併發集合類,因為它們不僅能夠提供 .NET Framework 2.0 集合類的型別安全性,而且能夠比 .NET Framework 1.0 集合更高效完整地提供執行緒安全性。
細粒度鎖定和無鎖機制
某些併發集合型別使用輕量同步機制,如 SpinLock、SpinWait、SemaphoreSlim 和 CountdownEvent,這些機制是 .NET Framework 4 中的新增功能。 這些同步型別通常在將執行緒真正置於等待狀態之前,會在短時間內使用忙旋轉。 預計等待時間非常短時,旋轉比等待所消耗的計算資源少得多,因為後者涉及資源消耗量大的核心轉換。 對於使用旋轉的集合類,這種效率意味著多個執行緒能夠以非常快的速率新增和刪除項。 有關旋轉與鎖定的詳細資訊,請參閱 SpinLock 和 SpinWait。
ConcurrentQueue<T> 和 ConcurrentStack<T> 類完全不使用鎖定。 相反,它們依賴於 Interlocked 操作來實現執行緒安全性。
備註
由於併發集合類支援 ICollection,因此該類可提供針對 IsSynchronized 和 SyncRoot 屬性的實現,即使這些屬性不相關。 IsSynchronized
始終返回 false
,而 SyncRoot
始終為 null
(在 Visual Basic 中是 Nothing
)。
下表列出了 System.Collections.Concurrent 名稱空間中的集合型別。
型別 | 描述 |
---|---|
BlockingCollection<T> | 為實現 IProducerConsumerCollection<T> 的所有型別提供限制和阻止功能。 有關詳細資訊,請參閱 BlockingCollection 概述。 |
ConcurrentDictionary<TKey,TValue> | 鍵值對字典的執行緒安全實現。 |
ConcurrentQueue<T> | FIFO(先進先出)佇列的執行緒安全實現。 |
ConcurrentStack<T> | LIFO(後進先出)堆疊的執行緒安全實現。 |
ConcurrentBag<T> | 無序元素集合的執行緒安全實現。 |
IProducerConsumerCollection<T> | 型別必須實現以在 BlockingCollection 中使用的介面。 |
相關主題
Title | 描述 |
---|---|
BlockingCollection 概述 | 描述 BlockingCollection<T> 型別提供的功能。 |
如何:在 ConcurrentDictionary 中新增和移除項 | 描述如何從 ConcurrentDictionary<TKey,TValue> 新增和刪除元素 |
如何:在 BlockingCollection 中逐個新增和取出項 | 描述如何在不使用只讀列舉器的情況下,從阻止的集合新增和檢索項。 |
如何:向集合新增限制和阻塞功能 | 描述如何將任一集合類用作 IProducerConsumerCollection<T> 集合的基礎儲存機制。 |
如何:使用 ForEach 移除 BlockingCollection 中的項 | 介紹瞭如何使用 foreach (在 Visual Basic 中是 For Each )在鎖定集合中刪除所有項。 |
如何:在管道中使用阻塞集合的陣列 | 描述如何同時使用多個阻塞集合來實現一個管道。 |
如何:使用 ConcurrentBag 建立目標池 | 演示如何使用併發包在可重用物件(而不是繼續建立新物件)的情況下改進效能。 |
參考
建議的內容
-
Interlocked.Increment 方法 (System.Threading)
以原子操作的形式遞增指定變數的值並存儲結果。
-
Semaphore 類 (System.Threading)
限制可同時訪問某一資源或資源池的執行緒數。
-
ConcurrentBag<T> 類 (System.Collections.Concurrent)
表示物件的執行緒安全的無序集合。
何時使用執行緒安全集合
瞭解何時在 .NET 中使用執行緒安全集合。 有五種專門為支援多執行緒新增和刪除操作而設計的集合型別。
BlockingCollection 概述
BlockingCollection<T> 是一個執行緒安全集合類,可提供以下功能:
-
實現製造者-使用者模式。
-
通過多執行緒併發新增和獲取項。
-
可選最大容量。
-
集合為空或已滿時通過插入和移除操作進行阻塞。
-
插入和移除“嘗試”操作不發生阻塞,或在指定時間段內發生阻塞。
-
封裝實現 IProducerConsumerCollection<T> 的任何集合型別
-
使用取消標記執行取消操作。
-
支援使用
foreach
(在 Visual Basic 中,使用For Each
)的兩種列舉:-
只讀列舉。
-
在列舉項時將項移除的列舉。
-
限制和阻塞支援
BlockingCollection<T> 支援限制和阻塞。 限制意味著可以設定集合的最大容量。 限制在某些情況中很重要,因為它使你能夠控制記憶體中的集合的最大大小,並可阻止製造執行緒移動到離使用執行緒前方太遠的位置。
多個執行緒或任務可同時向集合新增項,如果集合達到其指定最大容量,則製造執行緒將發生阻塞,直到移除集合中的某個項。 多個使用者可以同時移除項,如果集合變空,則使用執行緒將發生阻塞,直到製造者新增某個項。 製造執行緒可呼叫 CompleteAdding 來指示不再新增項。 使用者將監視 IsCompleted 屬性以瞭解集合何時為空且不再新增項。 下面的示例展示了容量上限為 100 的簡單 BlockingCollection。 只要滿足某些外部條件為 true,製造者任務就會向集合新增項,然後呼叫 CompleteAdding。 使用者任務獲取項,直到 IsCompleted 屬性為 true。
C#// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if dataItems.Count == 0.
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
有關完整的示例,請參閱 如何:在 BlockingCollection 中逐個新增和取出項。
計時阻塞操作
在針對有限集合的計時阻塞 TryAdd 和 TryTake 操作中,此方法將嘗試新增或取出某個項。 如果項可用,項會被置於通過引用傳入的變數中,然後方法返回 true。 如果在指定的超時期限過後未檢索到任何項,方法返回 false。 相應執行緒可以任意執行一些其他有用的工作,然後再重新嘗試訪問該集合。 有關計時阻塞訪問的示例,請參閱如何:在 BlockingCollection 中逐個新增和取出項中的第二個示例。
取消新增和取出操作
新增和取出操作通常會在一個迴圈內執行。 可以通過以下方法來取消迴圈:向 TryAdd 或 TryTake 方法傳入 CancellationToken,然後在每次迭代時檢查該標記的 IsCancellationRequested 屬性的值。 如果值為 true,由你自行決定是否通過清理所有資源並退出迴圈來響應取消請求。 下面的示例演示獲取取消標記和使用該標記的程式碼的 TryAdd 過載:
C#do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
有關如何新增取消支援的示例,請參閱如何:在 BlockingCollection 中逐個新增和取出項中的第二個示例。
指定集合型別
建立 BlockingCollection<T> 時,不僅可以指定上限容量,而且可以指定要使用的集合型別。 例如,可為先進先出 (FIFO) 行為指定 ConcurrentQueue<T>,也可為後進先出 (LIFO) 行為指定 ConcurrentStack<T>。 可使用實現 IProducerConsumerCollection<T> 介面的任何集合類。 BlockingCollection<T> 的預設集合型別為 ConcurrentQueue<T>。 下面的程式碼示例演示如何建立字串的 BlockingCollection<T>,其容量為 1000 並使用 ConcurrentBag<T>:
C#BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
有關詳細資訊,請參閱如何:向集合新增限制和阻塞功能。
IEnumerable 支援
BlockingCollection<T> 提供 GetConsumingEnumerable 方法,該方法允許使用者使用 foreach
(在 Visual Basic 中為 For Each
)刪除項直到完成集合,也就是說,集合為空且不再新增項。 有關詳細資訊,請參閱如何:使用 ForEach 移除 BlockingCollection 中的項。
將多個 BlockingCollection 作為整體使用
在使用者需要同時取出多個集合中的項的情況下,可以建立 BlockingCollection<T> 的陣列並使用靜態方法,如 TakeFromAny 和 AddToAny 方法,這兩個方法可以在該陣列的任意集合中新增或取出項。 如果一個集合發生阻塞,此方法會立即嘗試其他集合,直到找到能夠執行該操作的集合。 有關詳細資訊,請參閱如何:在管道中使用阻塞集合的陣列。
請參閱
何時使用執行緒安全集合
.NET Framework 4 引入了五種專為支援多執行緒新增和刪除操作而設計的集合型別。 為了實現執行緒安全,這些型別使用多種高效的鎖定和免鎖定同步機制。 同步會增加操作的開銷。 開銷數取決於所用的同步型別、執行的操作型別和其他因素,例如嘗試並行訪問該集合的執行緒數。
在某些方案中,同步開銷可忽略不計,使多執行緒型別的執行速度和縮放水平遠遠超過其受外部鎖保護的非執行緒安全同等型別。 在其他方案中,開銷可能會導致執行緒安全型別的執行速度和縮放水平與該型別外部鎖定的非執行緒安全版本相同,甚至更差。
以下部分提供有關何時使用執行緒安全集合與其非執行緒安全同等集合(其讀寫操作受使用者提供的鎖定保護)的通用指南。 由於效能可能因多種因素而異,所以本指南並不針對某特定情況且不一定對所有情況都有效。 如果效能非常重要,那麼確定要使用的集合型別的最佳方式是基於典型計算機配置和負載衡量效能。 本文件使用以下術語:
純生成方-使用方方案
任何給定執行緒要麼新增元素,要麼刪除元素,兩種操作不能同時執行。
混合生成方-使用方方案
任何給定執行緒可同時新增和刪除元素。
加速
相對於同一方案中其他型別更快速的演算法效能。
可伸縮性
與計算機核心數相稱的效能提升。 一種可伸縮的演算法,相比兩個核心,八個核心上的執行速度更快。
ConcurrentQueue (T) 與 Queue(T)
在純製造者-使用者方案中,每個元素的處理時間都非常短(幾條指令),而相比帶有外部鎖的 System.Collections.Generic.Queue<T>,System.Collections.Concurrent.ConcurrentQueue<T> 可提供適度的效能優勢。 在此方案中,當某一專用執行緒排隊,而另一專用執行緒取消排隊時,ConcurrentQueue<T> 的效能最佳。 如果不強制執行此規則,那麼 Queue<T> 在多核心計算機上的執行速度甚至可能稍快於 ConcurrentQueue<T>。
處理時間大約為 500 FLOPS(浮點運算)或更長時,該雙執行緒規則不適用於 ConcurrentQueue<T>,這將具有很好的可伸縮性。 Queue<T> 在此情況下無法正常伸縮。
在混合製造者-使用者方案中,處理時間非常短時,帶外部鎖的 Queue<T> 的伸縮性優於 ConcurrentQueue<T>。 但是,處理時間大約為 500 FLOPS 或更長時,ConcurrentQueue<T> 的伸縮性更佳。
ConcurrentStack 與堆疊
在純製造者-使用者方案中,當處理時間非常短時,System.Collections.Concurrent.ConcurrentStack<T> 和帶外部鎖的 System.Collections.Generic.Stack<T> 在使用一個專用推送執行緒和一個專用彈出執行緒時的執行效能可能大致相同。 但是,隨著執行緒數的增加,這兩種型別的執行效能會因爭用增加而降低,並且 Stack<T> 的執行效能可能優於 ConcurrentStack<T>。 處理時間大約為 500 FLOPS 或更長時,這兩種型別的伸縮速率大致相同。
在混合製造者-使用者方案中,對於小型和大型工作負荷,ConcurrentStack<T> 的速度更快。
使用 PushRange 和 TryPopRange 可能會大大加快訪問速度。
ConcurrentDictionary 與詞典
通常,在從多個執行緒中並行新增和更新鍵或值的任何方案中,會使用 System.Collections.Concurrent.ConcurrentDictionary<TKey,TValue>。 在涉及頻繁更新和相對較少讀取操作的方案中,ConcurrentDictionary<TKey,TValue> 通常具備一些優勢。 在涉及許多讀取和更新操作的方案中,ConcurrentDictionary<TKey,TValue> 通常在具備任意數量核心的計算機上執行速度更快。
在涉及頻繁更新的方案中,可以提高 ConcurrentDictionary<TKey,TValue> 中的併發度,然後進行衡量,檢視含有多個核心的計算機的效能是否有所提升。 如果更改併發級別,請儘可能避免全域性操作。
如果僅讀取鍵或值,Dictionary<TKey,TValue> 的速度會更快,因為如果字典未被任何執行緒修改,則無需同步。
ConcurrentBag
在純製造者-使用者方案中,System.Collections.Concurrent.ConcurrentBag<T> 的執行速度可能慢於其他併發集合型別。
在混合製造者-使用者方案中,對於大型和小型工作負荷,相比其他任何併發集合型別,往往 ConcurrentBag<T> 的執行速度更快且伸縮性更佳。
BlockingCollection
需要限制和阻止語義時,System.Collections.Concurrent.BlockingCollection<T> 的執行速度可能優於任何自定義實現。 它還支援諸多取消、列舉和異常處理操作。
請參閱
如何:在 BlockingCollection 中逐個新增和取出項
此示例展示瞭如何以阻止性和非阻止性方式在 BlockingCollection<T> 中新增和刪除項。 有關 BlockingCollection<T> 的詳細資訊,請參閱 BlockingCollection 概述。
有關如何列舉 BlockingCollection<T> 直至其為空且不再新增更多元素的示例,請參閱如何:使用 ForEach 移除 BlockingCollection 中的項。
示例
第一個示例展示瞭如何新增和取出項,以便在集合暫時為空(取出時)或達到最大容量(新增時),或超過指定超時期限時,阻止相應操作。 注意,僅當已建立 BlockingCollection 且建構函式中指定了最大容量時,才會啟用在達到最大容量時進行阻止的功能。
C#using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// Increase or decrease this value as desired.
int itemsToAdd = 500;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, itemsToAdd * 2 + 3);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// A bounded collection. Increase, decrease, or remove the
// maximum capacity argument to see how it impacts behavior.
var numbers = new BlockingCollection<int>(50);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
int i = -1;
while (!numbers.IsCompleted)
{
try
{
i = numbers.Take();
}
catch (InvalidOperationException)
{
Console.WriteLine("Adding was completed!");
break;
}
Console.WriteLine("Take:{0} ", i);
// Simulate a slow consumer. This will cause
// collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000);
}
Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
for (int i = 0; i < itemsToAdd; i++)
{
numbers.Add(i);
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
}
// See documentation for this method.
numbers.CompleteAdding();
});
// Keep the console display open in debug mode.
Console.ReadLine();
}
}
示例
第二個示例演示如何新增和取出項以便不會阻止操作。 如果沒有任何項、已達到繫結集合的最大容量或已超過超時期限,TryAdd 或 TryTake 操作返回 false。 這樣一來,執行緒可以暫時執行其他一些有用的工作,並在稍後再次嘗試檢索新項,或嘗試新增先前無法新增的相同項。 該程式還演示如何在訪問 BlockingCollection<T> 時實現取消。
C#using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
class ProgramWithCancellation
{
static int inputs = 2000;
static void Main()
{
// The token source for issuing the cancelation request.
var cts = new CancellationTokenSource();
// A blocking collection that can hold no more than 100 items at a time.
var numberCollection = new BlockingCollection<int>(100);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
int width = Math.Max(Console.BufferWidth, 80);
int height = Math.Max(Console.BufferHeight, 8000);
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(width, height);
}
// The simplest UI thread ever invented.
Task.Run(() =>
{
if (Console.ReadKey(true).KeyChar == 'c')
{
cts.Cancel();
}
});
// Start one producer and one consumer.
Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));
// Wait for the tasks to complete execution
Task.WaitAll(t1, t2);
cts.Dispose();
Console.WriteLine("Press the Enter key to exit.");
Console.ReadLine();
}
static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
{
// IsCompleted == (IsAddingCompleted && Count == 0)
while (!bc.IsCompleted)
{
int nextItem = 0;
try
{
if (!bc.TryTake(out nextItem, 0, ct))
{
Console.WriteLine(" Take Blocked");
}
else
{
Console.WriteLine(" Take:{0}", nextItem);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Taking canceled.");
break;
}
// Slow down consumer just a little to cause
// collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000);
}
Console.WriteLine("\r\nNo more items to take.");
}
static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
{
int itemToAdd = 0;
bool success = false;
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
// A shorter timeout causes more failures.
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
Console.WriteLine("Add loop canceled.");
// Let other threads know we're done in case
// they aren't monitoring the cancellation token.
bc.CompleteAdding();
break;
}
if (success)
{
Console.WriteLine(" Add:{0}", itemToAdd);
itemToAdd++;
}
else
{
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
// Don't increment nextItem. Try again on next iteration.
//Do something else useful instead.
UpdateProgress(itemToAdd);
}
} while (itemToAdd < inputs);
// No lock required here because only one producer.
bc.CompleteAdding();
}
static void UpdateProgress(int i)
{
double percent = ((double)i / inputs) * 100;
Console.WriteLine("Percent complete: {0}", percent);
}
}
另請參閱
使用 foreach 刪除 BlockingCollection 中的項
除了使用 Take 和 TryTake 方法從 BlockingCollection<T> 中提取項之外,還可結合使用 foreach(在 Visual Basic 中為 For Each)和 BlockingCollection<T>.GetConsumingEnumerable 刪除項,直至新增完成且集合為空。 由於與典型的 foreach
(For Each
) 迴圈不同,此列舉器通過刪除項來修改源集合,因此將其稱作 轉變列舉 或 耗用列舉。
示例
以下示例演示如何使用 foreach
(For Each
) 迴圈刪除 BlockingCollection<T> 中的所有項。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Example
{
// Limit the collection size to 2000 items at any given time.
// Set itemsToProduce to > 500 to hit the limit.
const int UpperLimit = 1000;
// Adjust this number to see how it impacts the producing-consuming pattern.
const int ItemsToProduce = 100;
static readonly BlockingCollection<long> Collection =
new BlockingCollection<long>(UpperLimit);
// Variables for diagnostic output only.
static readonly Stopwatch Stopwatch = new Stopwatch();
static int TotalAdditions = 0;
static async Task Main()
{
Stopwatch.Start();
// Queue the consumer task.
var consumerTask = Task.Run(() => RunConsumer());
// Queue the producer tasks.
var produceTaskOne = Task.Run(() => RunProducer("A", 0));
var produceTaskTwo = Task.Run(() => RunProducer("B", ItemsToProduce));
var producerTasks = new[] { produceTaskOne , produceTaskTwo };
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
var cleanupTask = Task.Factory.ContinueWhenAll(producerTasks, _ => Collection.CompleteAdding());
// Wait for all tasks to complete
await Task.WhenAll(consumerTask, produceTaskOne, produceTaskTwo, cleanupTask);
// Keep the console window open while the
// consumer thread completes its output.
Console.WriteLine("Press any key to exit");
Console.ReadKey(true);
}
static void RunProducer(string id, int start)
{
var additions = 0;
for (var i = start; i < start + ItemsToProduce; i++)
{
// The data that is added to the collection.
var ticks = Stopwatch.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine($"{id} adding tick value {ticks}. item# {i}");
if (!Collection.IsAddingCompleted)
{
Collection.Add(ticks);
}
// Counter for demonstration purposes only.
additions++;
// Comment this line to speed up the producer threads.
Thread.SpinWait(100000);
}
Interlocked.Add(ref TotalAdditions, additions);
Console.WriteLine($"{id} is done adding: {additions} items");
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the underlying collection.
var subtractions = 0;
foreach (var item in Collection.GetConsumingEnumerable())
{
Console.WriteLine(
$"Consuming tick value {item:D18} : item# {subtractions++} : current count = {Collection.Count}");
}
Console.WriteLine(
$"Total added: {TotalAdditions} Total consumed: {subtractions} Current count: {Collection.Count}");
Stopwatch.Stop();
}
}
此示例將 foreach
迴圈與耗用執行緒中的 BlockingCollection<T>.GetConsumingEnumerable 方法結合使用,這會導致在列舉每個項時將其從集合中刪除。 System.Collections.Concurrent.BlockingCollection<T> 隨時限制集合中所包含的最大項數。 按照此方式列舉集合會在沒有項可用或集合為空時阻止使用者執行緒。 在此示例中,由於製造者執行緒新增項的速度快於消耗項的速度,因此不需要考慮阻止問題。
BlockingCollection<T>.GetConsumingEnumerable 返回了 IEnumerable<T>
,因此無法保證順序。 但是,在內部將 System.Collections.Concurrent.ConcurrentQueue<T> 用作基礎集合型別,這將按照先進先出 (FIFO) 的順序取消物件的排隊。 如果對 BlockingCollection<T>.GetConsumingEnumerable 進行了併發呼叫,這些呼叫會爭用。 無法在一個列舉中觀察到在另一個列舉中使用(已取消排隊)的專案。
若要列舉集合而不對其進行修改,只需使用 foreach
(For Each
) 即可,無需使用 GetConsumingEnumerable 方法。 但是,務必要了解此類列舉表示的是某個精確時間點的集合快照。 如果其他執行緒在你執行迴圈的同時新增或刪除項,則迴圈可能不會表示集合的實際狀態。
請參閱
如何在 ConcurrentDictionary 中新增和刪除項
本示例演示如何在 System.Collections.Concurrent.ConcurrentDictionary<TKey,TValue> 中新增、檢索、更新和刪除項。 此集合類是一個執行緒安全實現。 建議在多個執行緒可能同時嘗試訪問元素時使用此集合類。
ConcurrentDictionary<TKey,TValue> 提供了多個便捷的方法,這些方法使程式碼在嘗試新增或移除資料之前無需先檢查鍵是否存在。 下表列出了這些便捷的方法,並說明在何種情況下這些方法。
方法 | 何時使用… |
---|---|
AddOrUpdate | 需要為指定鍵新增新值,如果此鍵已存在,則需要替換其值。 |
GetOrAdd | 需要檢索指定鍵的現有值,如果此鍵不存在,則需要指定一個鍵/值對。 |
TryAdd, TryGetValue, TryUpdate, TryRemove | 需要新增、獲取、更新或移除鍵/值對,如果此鍵已存在或因任何其他原因導致嘗試失敗,則需執行某種備選操作。 |
示例
下面的示例使用兩個 Task 例項將一些元素同時新增到 ConcurrentDictionary<TKey,TValue> 中,然後輸出所有內容,指明元素已成功新增。 此示例還演示如何使用 AddOrUpdate、TryGetValue 和 GetOrAdd 方法在集合中新增、更新、檢索和刪除專案。
C#using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace DictionaryHowTo
{
// The type of the Value to store in the dictionary.
class CityInfo : IEqualityComparer<CityInfo>
{
public string Name { get; set; }
public DateTime LastQueryDate { get; set; } = DateTime.Now;
public decimal Longitude { get; set; } = decimal.MaxValue;
public decimal Latitude { get; set; } = decimal.MaxValue;
public int[] RecentHighTemperatures { get; set; } = new int[] { 0 };
public bool Equals(CityInfo x, CityInfo y)
=> (x.Name, x.Longitude, x.Latitude) ==
(y.Name, y.Longitude, y.Latitude);
public int GetHashCode(CityInfo cityInfo) =>
cityInfo?.Name.GetHashCode() ?? throw new ArgumentNullException(nameof(cityInfo));
}
class Program
{
static readonly ConcurrentDictionary<string, CityInfo> Cities =
new ConcurrentDictionary<string, CityInfo>(StringComparer.OrdinalIgnoreCase);
static async Task Main()
{
CityInfo[] cityData =
{
new CityInfo { Name = "Boston", Latitude = 42.358769m, Longitude = -71.057806m, RecentHighTemperatures = new int[] { 56, 51, 52, 58, 65, 56,53} },
new CityInfo { Name = "Miami", Latitude = 25.780833m, Longitude = -80.195556m, RecentHighTemperatures = new int[] { 86,87,88,87,85,85,86 } },
new CityInfo { Name = "Los Angeles", Latitude = 34.05m, Longitude = -118.25m, RecentHighTemperatures = new int[] { 67,68,69,73,79,78,78 } },
new CityInfo { Name = "Seattle", Latitude = 47.609722m, Longitude = -122.333056m, RecentHighTemperatures = new int[] { 49,50,53,47,52,52,51 } },
new CityInfo { Name = "Toronto", Latitude = 43.716589m, Longitude = -79.340686m, RecentHighTemperatures = new int[] { 53,57, 51,52,56,55,50 } },
new CityInfo { Name = "Mexico City", Latitude = 19.432736m, Longitude = -99.133253m, RecentHighTemperatures = new int[] { 72,68,73,77,76,74,73 } },
new CityInfo { Name = "Rio de Janeiro", Latitude = -22.908333m, Longitude = -43.196389m, RecentHighTemperatures = new int[] { 72,68,73,82,84,78,84 } },
new CityInfo { Name = "Quito", Latitude = -0.25m, Longitude = -78.583333m, RecentHighTemperatures = new int[] { 71,69,70,66,65,64,61 } },
new CityInfo { Name = "Milwaukee", Latitude = -43.04181m, Longitude = -87.90684m, RecentHighTemperatures = new int[] { 32,47,52,64,49,44,56 } }
};
// Add some key/value pairs from multiple threads.
await Task.WhenAll(
Task.Run(() => TryAddCities(cityData)),
Task.Run(() => TryAddCities(cityData)));
static void TryAddCities(CityInfo[] cities)
{
for (var i = 0; i < cities.Length; ++i)
{
var (city, threadId) = (cities[i], Thread.CurrentThread.ManagedThreadId);
if (Cities.TryAdd(city.Name, city))
{
Console.WriteLine($"Thread={threadId}, added {city.Name}.");
}
else
{
Console.WriteLine($"Thread={threadId}, could not add {city.Name}, it was already added.");
}
}
}
// Enumerate collection from the app main thread.
// Note that ConcurrentDictionary is the one concurrent collection
// that does not support thread-safe enumeration.
foreach (var city in Cities)
{
Console.WriteLine($"{city.Key} has been added.");
}
AddOrUpdateWithoutRetrieving();
TryRemoveCity();
RetrieveValueOrAdd();
RetrieveAndUpdateOrAdd();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
// This method shows how to add key-value pairs to the dictionary
// in scenarios where the key might already exist.
static void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
var ci = new CityInfo
{
Name = "Toronto",
Latitude = 43.716589M,
Longitude = -79.340686M,
RecentHighTemperatures = new int[] { 54, 59, 67, 82, 87, 55, -14 }
};
// Try to add data. If it doesn't exist, the object ci is added. If it does
// already exist, update existingVal according to the custom logic.
_ = Cities.AddOrUpdate(
ci.Name,
ci,
(cityName, existingCity) =>
{
// If this delegate is invoked, then the key already exists.
// Here we make sure the city really is the same city we already have.
if (ci != existingCity)
{
// throw new ArgumentException($"Duplicate city names are not allowed: {ci.Name}.");
}
// The only updatable fields are the temperature array and LastQueryDate.
existingCity.LastQueryDate = DateTime.Now;
existingCity.RecentHighTemperatures = ci.RecentHighTemperatures;
return existingCity;
});
// Verify that the dictionary contains the new or updated data.
Console.Write($"Most recent high temperatures for {ci.Name} are: ");
var temps = Cities[ci.Name].RecentHighTemperatures;
Console.WriteLine(string.Join(", ", temps));
}
// This method shows how to use data and ensure that it has been
// added to the dictionary.