RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
一個Observable的生產速度可能比Observer的消費速度快。併發的時候,Observable鏈的不同operators執行在不同的Schedulers上。慢操作的後面會排隊,產生瓶頸。
可以使用Flowable處理瓶頸。但是,不是每個源都可以被backpressured。你不能讓Observable.interval()(甚至Flowable.interval())變慢,因為這些emissions是時間敏感的。使用者的輸入事件,比如點選按鈕,也是不能被backpressured。
幸好,下面介紹的這些operators,可以不使用backpressure應付快速產生的源,或者無法使用backpressure的時候。
Buffering
buffer()收集emissions,批量發射(一個list或者其他集合型別)。可以是固定的buffer,也可以是一個時間視窗,甚至被另一個Observable分割。
Fixed-size buffering
這個簡單的例子,buffer()接受count引數,以固定尺寸,成批發射:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
. buffer(8)
.subscribe(System.out::println);
}
}
輸出是
[1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16]
[17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32]
[33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48]
[49, 50]
也可以提供第二個引數bufferSupplier,把元素放到一個自定義的集合,比如HashSet:
import io.reactivex.Observable;
import java.util.HashSet;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.buffer(8, HashSet::new)
.subscribe(System.out::println);
}
}
還可以提供一個skip引數,指定開始一個新的buffer前,要跳過多少元素。如果skip等於count,skip沒有影響。
如果不相等,事情變得很有趣。比如,你buffer兩個emissions,但是跳過三個:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(2, 3)
.subscribe(System.out::println);
}
}
輸出是
[1, 2]
[4, 5]
[7, 8]
[10]
如果skip小於count,就得到一個有趣的rolling buffers。比如buffer的大小是3,skip是1:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(3, 1)
.subscribe(System.out::println);
}
}
輸出是
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10]
下面的例子,使用buffer(2, 1)發射,然後使用filter()過濾掉最後一個:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(2, 1)
.filter(c -> c.size() == 2)
.subscribe(System.out::println);
}
}
Time-based buffering
buffer()也可以使用固定的時間間隔。下面的程式碼,源每300毫秒發射,每個緩衝的list包含3個或者4個元素:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
sleep(4000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
也有可選的timeskip引數,它控制每個buffer開始的時間。
還可以提供第三個引數count,控制buffer的最大size。無論到時間了,還是buffer滿了,都會導致buffer發射。如果時間視窗關閉之前,buffer滿了,會發射空的buffer:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(1, TimeUnit.SECONDS, 2)
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出是
[300, 600]
[900]
[1200, 1500]
[1800]
[2100, 2400]
[2700]
[3000, 3300]
[3600, 3900]
[]
[4200, 4500]
[4800]
基於時間的buffer(),使用computation Scheduler。
Boundary-based buffering
更強大的buffer()變種是接受另一個Observable作為boundary引數。其他Observable發射的型別不重要。重要的是,每當它發射,就開始另一個buffer。
比如前面的例子,我們使用每秒的Observable.interval()作為每300毫秒的Observable.interval()的邊界:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> cutOffs =
Observable.interval(1, TimeUnit.SECONDS);
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(cutOffs)
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出是
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]
[4200, 4500, 4800]
Windowing
window()和buffer()幾乎一樣,只是它把元素緩衝進其他Observables,而不是集合。它返回一個Observable<Observable>。每個Observable emission會快取emissions,訂閱以後flush他們(很像GroupedObservable)。這樣,emissions有效了就發射,而不是list滿了才發射。
和buffer()一樣,每個批也可以是固定大小的,時間間隔的,或者來自另一個Observable。
Fixed-size windowing
我們修改以前的例子,使用window()把50個整數緩衝進長度為8的list。我可以響應式地轉換每個批次成為一個非集合,比如使用“|”級聯的字串:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.window(8)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
}
}
就像buffer(),可以提供skip引數,表示開始一個新視窗前,需要跳過多少emissions。下面的例子,視窗大小是2,跳過3個元素。然後接受每個視窗Observable,reduce成級聯的字串:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.window(2, 3)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
}
}
Time-based windowing
下面,有一個每300毫秒發射的Observable,每一秒切成一個分離的Observables,然後字串級聯:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.window(1, TimeUnit.SECONDS)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Boundary-based windowing
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> cutOffs =
Observable.interval(1, TimeUnit.SECONDS);
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.window(cutOffs)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Throttling
buffer()和window()把emissions分批放進集合或者Observables,這樣做一般是合併而部署忽略emissions。而throttle(),emissions發生得太快時,就忽略一部分。當假設快速的emissions是冗餘的或者不想要的時候,還是很有用的,比如重複點選button。
下面的例子,我們有三個Observable.interval()源,分別以100毫秒、300毫秒和2000毫秒的間隔發射。我們只接受第一個源的前十個元素,第二個的三個,第三個的兩個元素。我們使用Observable.concat()把他們連到一起:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 100) // map to elapsed time
.map(i -> "SOURCE 1: " + i)
.take(10);
Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.map(i -> "SOURCE 2: " + i)
.take(3);
Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 2000) // map to elapsed time
.map(i -> "SOURCE 3: " + i)
.take(2);
Observable.concat(source1, source2, source3)
.subscribe(System.out::println);
sleep(6000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
接下來,我們使用throttle(),只選擇其中一部分,忽略其他的。
throttleLast() / sample()
throttleLast()(它的別名是sample())只發射每個固定時間間隔之內的最後的元素。
Observable.concat(source1, source2, source3)
.throttleLast(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
輸出是
SOURCE 1: 900
SOURCE 2: 900
SOURCE 3: 2000
可以看到,發射了每個一秒的最後一個emission。
如果想以更大的時間間隔節流,你會得到更少的emissions:
Observable.concat(source1, source2, source3)
.throttleLast(2, TimeUnit.SECONDS)
.subscribe(System.out::println);
輸出是
SOURCE 2: 900
SOURCE 3: 2000
如果減少時間間隔,會得到更多emissions:
Observable.concat(source1, source2, source3)
.throttleLast(500, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
輸出是
SOURCE 1: 400
SOURCE 1: 900
SOURCE 2: 300
SOURCE 2: 900
SOURCE 3: 2000
throttleFirst()
throttleFirst()只發射固定時間間隔的第一個元素:
Observable.concat(source1, source2, source3)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
輸出是
SOURCE 1: 100
SOURCE 2: 300
SOURCE 3: 2000
SOURCE 3: 4000
throttleFirst()和throttleLast()使用computation Scheduler,你可以使用第三個引數指定自己的Scheduler。
throttleWithTimeout() / debounce()
當快速發射的時候,throttleWithTimout()不發射任何東西,出現一個“沉默期”以後,就發射最後一個emission。
它接受時間間隔引數,表示多長時間沒收到emissions,就發射最後一個emission。比如前面的例子,我們的沉默期是一秒:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 100) // map to elapsed time
.map(i -> "SOURCE 1: " + i)
.take(10);
Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.map(i -> "SOURCE 2: " + i)
.take(3);
Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 2000
相關推薦
RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
Buffering
Fixed-size buffering
Time-based buffering
Boun
RxJava學習 - 5. Single, Completable, and Maybe
RxJava學習 - 5. Single, Completable, and Maybe
Single
Maybe
Completable
Single
Single實際上只發射一次。它有自己的SingleObserver介面:
i
HTML學習筆記-網上書城案例 AND 2018-11-14(22:14)
小夥伴們,又見啦! 一、HTML 網上書城案例 1.頁頭 實現程式碼:
<div>
<table>
<tr>
<td><img src="圖片路徑" height="50" width="100" /><
RxJava學習 - 13. Transformers and Custom Operators
RxJava學習 - 13. Transformers and Custom Operators
Transformers
ObservableTransformer
FlowableTransformer
Avoiding shared
RxJava學習 - 12. Flowables and Backpressure
RxJava學習 - 12. Flowables and Backpressure
Understanding backpressure
An example that needs backpressure
Introducing the Flo
RxJava學習 - 10. Concurrency and Parallelization
RxJava學習 - 10. Concurrency and Parallelization
Introducing RxJava concurrency
Keeping an application alive
Understanding
RxJava學習 - 9. Multicasting, Replaying, and Caching
RxJava學習 - 9. Multicasting, Replaying, and Caching
Understanding multicasting
Multicasting with operators
When to multicast
【Spark深入學習-11】Spark基本概念和運行模式
nmf 磁盤 大數據平臺 並不是 鼠標 .cn 管理系統 大型數據集 spa ----本節內容-------
1.大數據基礎
1.1大數據平臺基本框架
1.2學習大數據的基礎
1.3學習Spark的Hadoop基礎
2.Hadoop生態基本介紹
2.1
【ES】學習11-多桶排序
nbsp order key actions color 字符串 efi 結果 literal 聚合結果的排序
默認:桶會根據 doc_count 降序排列。
內置排序:
設置按doc_count升序排序:註意order,_count
GET /cars/transa
學習11
npe 接受 問題 重要 心情大好 作品 收藏 對比 推薦 7月1日,新聞,小兒推拿培訓。
柔小兒些,雅麗對著鏡頭道。,善解小兒意培訓女孩子。而且還濟南學習斷培訓增加。而且李嫂學習推拿可以每天都來看望嗎。而且練霓裳培訓性格,這個月必須給小緣發獎金。
,培訓全版權。這樣成
struts2學習(11)struts2驗證框架1.驗證簡介、內置驗證
oid -- 技術分享 ucc view 1.0 style text field 一、Struts2驗證簡介:
二、struts2內置驗證:
下面例子,需求是:為用戶註冊進行驗證;
com.cy.model.User.java:
package com.cy
Spark機器學習(11):協同過濾算法
設置 tel println print emp master ani alt tro 協同過濾(Collaborative Filtering,CF)算法是一種常用的推薦算法,它的思想就是找出相似的用戶或產品,向用戶推薦相似的物品,或者把物品推薦給相似的用戶。怎樣評價用戶
最權威的RXJaVa學習資料
學習 android music roi androi andro java學習 oid com aNDROID%E8%AF%BB%E5%86%99%E6%96%87%E4%BB%B6%E7%9A%84N%E7%A7%8D%E5%86%99%E6%B3%95
http:/
前端學習11.14
mbed out tac asc 蘋果公司 局限 com menu etime 轉載自:http://www.cnblogs.com/best/p/6096476.html#_lab2_2_0
1、新增加其它元素
1.1、meter
表示特定範圍內的數值,可用於工資、數量、
Nginx服務學習(11)-應用場景
code vpd ces oca eve 壓縮 pro off 傳輸 Nginx服務
Nginx靜態資源web服務
Nginx代理服務
Nginx負載均衡調度
Nginx緩存
CDN分發網絡
模塊配置
Syntax:sendfile on | off; ##文件
ROS學習 Writing a Simple Publisher and Subscriber & Examining them
Go got pub sco targe ide 代碼 int pie 本文主要部分全部來源於ROS官網的Tutorials.
創建Publisher Node
roscd beginner_tutorials
mkdir -p src
gedit src/talker.
python 的基礎 學習 11天 作業題
blog 結果 字母 log 知識 修改 參數 OS 作業 1、整理函數相關知識點,寫博客
2、寫函數,檢查獲取傳入列表或元組對象的所有奇數位索引對應的元素,並將其作為新列表返回給調用者。
3、寫函數,判斷用戶傳入的對象(字符串、列表、元組)長度是否大於5。
4、寫函數,檢
python學習11——if 語句。
python學習 wake 17. dream people som bubuko ant tab
people=int(input("How many people are there in this city?"))
cats = int(input("How m
python基礎學習11----函數
enc 局部作用域 語句 基礎 變量 each 不可變 不可 lis 一.函數的定義
def 函數名(參數列表):
函數體
return語句
return語句不寫或後邊不加任何對象即為return None
二.函數的參數
無參數
def func1
Python學習小記(2)---[list, iterator, and, or, zip, dict.keys]
1.List行為
可以用 alist[:] 相當於 alist.copy() ,可以建立一個 alist 的 shallo copy,但是直接對 alist[:] 操作卻會直接操作 alist 物件