阻塞資料流的檢測和測試 – RxJava
原文連結 作者:Tomasz Nurkiewicz 譯者:simonwang
想象一下,你有一個數據流以不定的頻率推送一些event。在某些時候,你期望資料流會以每秒幾十個訊息的頻率進行推送,但事實上幾秒時間內一個event都沒被推送。如果你的資料流是通過web socket,SSE或者其他網路協議進行傳輸,那麼長時間的資料靜默則可以斷定為網路問題。這時我們經常會人為地傳送events(pings)來確保:
- clients正常執行
- 讓clients知道我方也正常執行
舉個具體的例子,假如我們有一個Flowable流來提供events,如果超過1秒沒有接收到event,我們就會發送”PING”字串作為佔位符。當靜默的時間過長時,那麼”PING”每秒都會出現一次。那麼這樣的需求我們怎樣能利用RxJava來實現呢?最明顯的,但不正確的做法是將兩個原始的流merge在一起:
Flowable<String> events = //... Flowable<String> pings = Flowable .interval(1, SECONDS) .map(x -> "PING"); Flowable<String> eventsWithPings = events.mergeWith(pings);
mergeWith()操作很關鍵:它將真正的events和常量pings流聯合起來,當沒有events出現時,”PING”會出現。不幸的是,這種做法將會與原始流完全脫節。因為即使在events正常出現時,pings的傳送並不會停止。而且當靜默開始時,我們並不能精確地在每秒結束時傳送”PING”。當然,如果你對這種機制ok的話,那就不用往下看了。
debounce()操作符
另一個更復雜的方法需要能夠能識別出持續至少1秒的靜默,可以使用timeout()操作符做到這點。不幸的是,它能丟擲TimeoutException並且從上層資料流那裡unsubscribe-這種方法太不穩定。我們僅想要得到某種notification,事實證明debounce()可以做到這點。正常情況下,這個操作符為了防止在極短時間段內有新的events到來,它會推遲上一個event的推送(debounce()的詳細作用見ReactiveX官網定義)。我們重寫這個方法:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS);
這就意味著,如果在1秒內某個event沒有後續的其他event跟隨,那麼delayed流就會將這個event傳送出去。嚴格說來,如果events流生產event的速度足夠快的話,delayed流將不會發送任何東西,我們將會利用delayed流獲取到靜默的產生:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.map(ev -> "PING"); Flowable<String> eventsWithPings = Flowable.merge(events, pings);
mergeWith()與靜態方法merge()的使用就結果而言並沒有差異。如果events流中event的接收與傳送頻繁,那麼delayed流不會接收到任何的event,”PING”資訊也就不會被髮送。然而,如果events流超過1秒沒有傳送任何event的話,delayed會將events流中最近接收到的一個event對映為”PING”。這種方法很聰明,但有個缺陷。在發生阻塞後,delayed流只會傳送一次”PING”,並不會週期性的每1秒鐘傳送一次。然而,解決這點很簡單!與其將最近接收到的event轉化為單個”PING”,我們可以將它轉化為週期性的pings序列:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed .flatMap(x -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") ); Flowable<String> eventsWithPings = Flowable.merge(events, pings);
上面的程式碼仍然有缺陷,因為每次資料流發生一點點阻塞,之後的每秒我們都會發送pings。但我們需要的是在資料流恢復正常的events流動後,pings的推送要馬上停止,這一點我們並沒有做到。每一次上層流中發生的阻塞,都會產生一個新的無限pings流,使其出現在最終的merge流中。無論如何,在events流恢復正常的event推送之後,我們都應該通知delayed流停止傳送pings。而takeUntil()則剛好可以做到這點!
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed .flatMap(x -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") .takeUntil(events) ); Flowable<String> eventsWithPings = Flowable.merge(events, pings);
花點時間弄明白上面的程式碼片段。每當events流超過1秒沒有動靜的時候,delayed流都會發送一個event。當pings流接收到了從delayed流那裡傳送過來的event後,就會開始推送一個間隔1秒的”PING”序列。然而在events流重新開始傳送event後,pings流則會終止。所有的這些都可以用一個巢狀的語句表示:
Flowable<String> events = //... Flowable<String> eventsWithPings = events .mergeWith( events .debounce(1, SECONDS) .flatMap(x1 -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") .takeUntil(events) ));
Testability
好的,所有這些都已準備完畢,而我們該如何對一個有著三層巢狀的由事件驅動的程式碼塊進行測試呢?我們該如何確定pings能出現在正確的時間,並且當阻塞結束時它能正確終止呢?要怎樣模擬這種時變的場景?不用擔心,RxJava有很多殺手鐗,其中與時序有關的測試手段可能是最厲害的一個。首先,讓我們對pinging程式碼進行一些修改,使之多一點可測試性和通用性:
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) { return events .mergeWith( events .debounce(1, SECONDS, clock) .flatMap(x1 -> Flowable .interval(0, 1, SECONDS, clock) .map(e -> ping) .takeUntil(events) )); }
這個工具方法能夠處理任意的T流,並且如果在一段時間內T流不產出任何event的話,它能將pings合到原始流中區去。在我們的測試中可以這樣使用它:
PublishProcessor<String> events = PublishProcessor.create(); TestScheduler clock = new TestScheduler(); Flowable<String> eventsWithPings = withPings(events, clock, "PING");
PublishProcessor是Flowable的子類,所以我們能夠用它來充當events流。另一方面,我們可以利用它的onNext()方法有命令地傳送events:
events.onNext("A");
如果某人監聽到了events流,他會馬上接收到”A”event。而clock又有什麼用呢?RxJava中的每個與時間有關的單一運算子(如debounce(), interval(), timeout(), window())都有一個可選的Scheduler引數,它的作用是作為外部的時間源。特殊的TestScheduler就是一個人造的時間源,我們可以完全的掌控它。也就是說,如果我們不呼叫advanceTimeBy(),時間就是靜止的:
clock.advanceTimeBy(999, MILLISECONDS);
999毫秒不是個巧合,Pings要精確的出現在1秒後,所以999毫秒之前pings應該不可見。現在是時候展示完整的測試例項了:
@Test public void shouldAddPings() throws Exception { PublishProcessor<String> events = PublishProcessor.create(); final TestScheduler clock = new TestScheduler(); final Flowable<String> eventsWithPings = withPings(events, clock, "PING"); final TestSubscriber<String> test = eventsWithPings.test(); events.onNext("A"); test.assertValues("A"); clock.advanceTimeBy(999, MILLISECONDS); events.onNext("B"); test.assertValues("A", "B"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING"); events.onNext("C"); test.assertValues("A", "B", "PING", "C"); clock.advanceTimeBy(1000, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING"); events.onNext("D"); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D"); clock.advanceTimeBy(999, MILLISECONDS); events.onNext("E"); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING"); clock.advanceTimeBy(3_000, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING"); }
看起來像是一堵牆,但這確實是我們邏輯的一個完整測試場景。它確保pings精確地出現在了1000毫秒之後,並且週期性地重複出現,在正常的event再次出現後會馬上停止出現。最重要的一點是:這個測試是100%可預見的,並且速度極快。沒有Awaitility,busy waiting,polling以及間歇性的測試失敗和遲緩。人工的clock可以完全對其進行控制,以確保所有的組合流都能按預期的工作。