1. 程式人生 > >Flow Control(流控)

Flow Control(流控)

Backpressure(背壓)只是解決Flow Control的其中一個方案。

就像小學做的那道數學題:一個水池,有一個進水管和一個出水管。如果進水管水流更大,過一段時間水池就會滿(溢位)。這就是沒有Flow Control導致的結果。

而解決Flow Control有幾種思路呢?
(1)Backpressure,就是消費者需要多少,生產者就生產多少。這有點類似於TCP裡的流量控制,接收方根據自己的接收視窗的情況來控制接收速率,並通過反向的ACK包來控制傳送方的傳送速率。這種方案只對於cold Observable有效。cold Observable是那些允許降低速率的傳送源,比如兩臺機器傳一個檔案,速率可大可小,即使降低到每秒幾個位元組,只要時間足夠長,還是能夠完成的。相反的例子就是音視訊直播,速率低於某個值整個功能就沒法用了(這種類似於hot Observable)。
(2)節流(Throttling)

,說白了就是丟棄。消費不過來,就處理其中一部分,剩下的丟棄。至於處理哪些和丟棄哪些,就有不同的策略,也就是sample (or throttleLast)、throttleFirst、debounce (or throttleWithTimeout)這三種。還是舉音視訊直播的例子,在下游處理不過來的時候,就需要丟棄資料包。
(3)打包(buffer和window)。buffer和window基本一樣,只是輸出格式不太一樣。它們是把上游多個小包裹打成大包裹,分發到下游。這樣下游需要處理的包裹的個數就減少了。
(4)是一種特殊情況,阻塞住整個呼叫鏈(Callstack blocking)。之所以說這是一種特殊情況,是因為這種方式只適用於整個呼叫鏈都在一個執行緒上同步執行,這要求中間的各個operator都不能啟動新的執行緒。在平常使用中這種應該是比較少見的,因為我們經常使用subscribeOn或observeOn來切換執行執行緒,而且有些複雜的operator本身也會內部啟動新的執行緒來處理。另外,如果真的出現了完全同步的呼叫鏈,前面的(1)(2)(3)仍然有可能適用的,只不過這種阻塞的方式更簡單,不需要額外的支援。

舉個例子
比較一下(1)和(4)。(4)相當於很多車行駛在盤山公路上,而公路只有一條車道。那麼排在最前面的第一輛車就擋住了整條路,後面的車也只能排在後面。而(1)相當於銀行辦業務時的視窗叫號,視窗主動叫某個號過去(相當於請求),那個人才過去辦理。


然後,從細的方面解釋一下sample,throttleFirst,debounce。以及onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock和ConnectableObservable(multicast)。

     sample就是throttleLast,取樣。類比一下音訊取樣
,8kHz的音訊就是每125微秒採一個值。sample可以配置成,比如每100毫秒取樣一個值,但100毫秒內上游可能過來很多值,選那個值呢,就是選最後那個值。所以它也叫throttleLast。
     throttleFirst跟sample類似,比如還是每100毫秒取樣一個值,但選這100毫秒內的第一個值。
     debounce,也叫throttleWithTimeout,名字裡就包含一個例子。比如,一個網路程式維護一個TCP連線,不停地收發資料,但中間沒資料可以收發的時候,就有間歇。這段間歇的時間,可以稱為idle time。當idle time超過一個預設值的時候,就算超時了(timeout),這個時候可能就需要把連線斷開了。實際上一些做server端的網路程式就是這麼工作的。每收發一個數據包之後,啟動一個計時器,等待idle time過去之後的超時,如果計時器到時之前,又有收發資料包的行為,那麼計時器重置,等待一個新的idle time。當計時器到時了,就time out了,這個連線就可以關閉了。debounce的行為,跟這個非常類似,可以用它來找到連續的收發事件之後idle time超時後的timeout事件。



最後還有一個新的問題需要說明。Backpressure有些Observable是支援的,有些不支援。但它們可以通過operator來轉化。

     onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock就可以把一個不支援Backpressure的Observable轉成一個支援Backpressure的Observable(即支援request請求)。但轉完之後的策略不太相同。
     onBackpressureBuffer是不丟棄資料的處理方式。把上游收到的全部快取下來,等下游來請求再發給下游。相當於一個水庫。但上游太快,就會buffer溢位。
     onBackpressureDrop就是當上游來資料的時候,看下游有沒有需求,有需求就發給下游,否則上游來的資料就丟掉。
     onBackpressureBlock也是看下游有沒有需求,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己並不快取。
     相反,有時候一些operator也能把一個支援Backpressure的Observable變成一個不支援Backpressure的Observable。比如,ConnectableObservable就是這樣。它類似於把一條河的主幹,在下游分成若干支流(但不太一樣的是每條支流的水量都跟主幹一樣,是拷貝的)。那麼很好理解,下游某個支流想對上游產生背壓,是不太可能的,它阻止不了水流流向其它支流。

 

轉載:https://www.zhihu.com/question/49618581