《C#併發程式設計經典例項》—— 傳送通知給上下文
問題
Rx 儘量做到了執行緒不可知(thread agnostic)。因此它會在任意一個活動執行緒中發出通知(例如 OnNext)。
但是我們通常希望通知只發給特定的上下文。例如 UI 元素只能被它所屬的 UI 執行緒控制, 因此,如果要根據 Rx 的通知來修改 UI,就應該把通知“轉移”到 UI 執行緒。
解決方案
Rx 提供了 ObserveOn 操作符,用來把通知轉移到其他執行緒排程器。 看下面的例子,使用 Interval,每秒鐘產生一個 OnNext 通知:
private void Button_Click(object sender, RoutedEventArgs e) { Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " + Environment.CurrentManagedThreadId)); }
用我的電腦測試,顯示結果為:
UI thread is 9
Interval 0 on thread 10
Interval 1 on thread 10
Interval 2 on thread 11
Interval 3 on thread 11
Interval 4 on thread 10
Interval 5 on thread 11
Interval 6 on thread 11
因為 Interval 基於一個定時器(沒有指定的執行緒),通知會線上程池執行緒中引發,而不是 在 UI 執行緒中。要更新 UI 元素,可以通過 ObserveOn 輸送通知,並傳遞一個代表 UI 執行緒 的同步上下文:
private void Button_Click(object sender, RoutedEventArgs e) { var uiContext = SynchronizationContext.Current; Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId); Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(uiContext) .Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " + Environment.CurrentManagedThreadId)); }
ObserveOn 的另一個常用功能是可以在必要時離開 UI 執行緒。假設有這樣的情況:滑鼠一移
動,就意味著需要進行一些 CPU 密集型的計算。預設情況下,所有的滑鼠移動事件都發 生在 UI 執行緒,因此可以使用 ObserveOn 把通知移動到一個執行緒池執行緒,在那裡進行計算, 然後再把表示結果的通知返回給 UI 執行緒:
private void Button_Click(object sender, RoutedEventArgs e)
{
var uiContext = SynchronizationContext.Current;
Trace.WriteLine(“UI thread is ” + Environment.CurrentManagedThreadId); Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// 複雜的計算過程。
Thread.Sleep(100);
var result = position.X + position.Y; Trace.WriteLine(“Calculated result ” + result + ” on thread ” +
Environment.CurrentManagedThreadId);
return result;
})
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(“Result ” + x + ” on thread ” + Environment.CurrentManagedThreadId));
}
執行這段程式碼的話,就會發現計算過程是線上程池執行緒中進行的,計算結果在 UI 執行緒中
顯示。另外,還會發現計算和結果會滯後於輸入,形成等待的佇列,這種現象出現的原因 在於,比起 100 秒 1 次的計算,滑鼠移動的更新頻率更高。Rx 中有幾種技術可以處理這 種情況,其中一個常用方法是對輸入流速進行限制,具體會在 5.4 節介紹。
討論
實際上,ObserveOn 是把通知轉移到一個 Rx 排程器上了。本節介紹了預設排程器(即執行緒 池)和一種建立 UI 排程器的方法。ObserveOn 最常用的功能是移到或移出 UI 執行緒,但調 度器也能用於別的場合。6.6 節介紹高階測試時,將再次關注排程器。