RX之非同步操作(start、toAsync、startFuture、deferFuture、fromAction、fromRunnable、forEachFuture、runAsync)
需要在buld.gradle中加入compile 'io.reactivex:rxjava-async-util:0.21.0'
一、start
返回一個Observable,它發射一個類似於函式宣告的值。程式語言有很多種方法可以從運算結果中獲取值,它們的名字一般叫functions,
futures, actions, callables, runnables
等等。在Start
目錄下的這組操作符可以讓它們表現得像Observable,因此它們可以在Observables呼叫鏈中與其它Observable搭配使用。Start
操作符的多種RxJava實現都屬於可選的rxjava-async
模組。
rxjava-async
start
操作符,它接受一個函式作為引數,呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable。
注意:這個函式只會被執行一次,即使多個觀察者訂閱這個返回的Observable。
5s後才輸出列印結果Observable<Integer> observable = Async.start(new Func0<Integer>() { @Override public Integer call() { //函式內為非同步操作 try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } return 20; } }); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG, "onNext................." + v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable.subscribe(subscriber);
執行結果:
二、toAsync
rxjava-async
模組還包含這幾個操作符:toAsync
, asyncAction
, 和asyncFunc
。它們接受一個函式或一個Action作為引數。對於函式(functions),這個操作符呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable(和start
一樣)。對於動作(Action),過程類似,但是沒有返回值,在這種情況下,這個操作符在終止前會發射一個null
值。
注:這個函式或動作只會被執行一次,即使多個觀察者訂閱這個返回的Observable。
Func0<Observable<Integer>> func0 = Async.toAsync(new Func0<Integer>() { @Override public Integer call() { //函式內為非同步操作 try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } return 20; } }); Observable<Integer> observable = func0.call(); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG, "onNext................." + v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable.subscribe(subscriber);
執行結果:
三、startFuture
xjava-async
模組還包含一個startFuture
操作符,傳遞給它一個返回Future
的函式,startFuture
會立即呼叫這個函式獲取Future
物件,然後呼叫Future
的get()
方法嘗試獲取它的值。它返回一個發射這個值給後續觀察者的Observable。
final Future<Integer> future = Executors.newCachedThreadPool().submit(new Task());
Observable<Integer> observable = Async.startFuture(new Func0<Future<Integer>>() {
@Override
public Future<Integer> call() {
//函式內為非同步操作
return future;
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG, "onNext................." + v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
執行結果:
10s後才輸出列印結果
四、deferFuture
deferFuture中的函式可以非同步執行一些操作,當完成後返回一個Observable,但是這個Observable不會立刻發射資料,直到開始訂閱時。
rxjava-async
模組還包含一個deferFuture
操作符,傳遞給它一個返回Future
的函式(這個Future
返回一個Observable
),deferFuture
返回一個Observable,但是不會呼叫你提供的函式,知道有觀察者訂閱它返回的Observable。這時,它立即呼叫Future
的get()
方法,然後映象發射get()
方法返回的Observable發射的資料。
用這種方法,你可以在Observables呼叫鏈中包含一個返回Observable的Future
物件。
Observable<Long> observable = Observable.interval(1,TimeUnit.SECONDS).take(5);
final Future<Observable<Integer>> future = Executors.newCachedThreadPool().submit(new ObservableTask());
final Observable<Integer> observable1 = Async.deferFuture(new Func0<Future<Observable<Integer>>>() {
@Override
public Future<Observable<Integer>> call() {
return future;
}
});
final Subscriber<Integer> subscriber1 = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1................." + integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
Subscriber<Long> subscriber = new Subscriber<Long>() {
@Override
public void onNext(Long integer) {
Log.e(TAG, "onNext................." + integer);
if(integer == 4){
observable1.subscribe(subscriber1);
}
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
執行結果:
五、fromAction
當fromAction()中的函式Action0執行完成後發射資料。Action無返回結果。rxjava-async
模組還包含一個fromAction
操作符,它接受一個Action
作為引數,返回一個Observable,一旦Action終止,它發射這個你傳遞給fromAction
的資料。
final int result = 1;
final Observable<Integer> observable = Async.fromAction(new Action0() {
@Override
public void call() {
for(int i=1;i<10;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},result);
final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1................." + integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
observable.subscribe(subscriber);
執行結果:六、fromCallable
當fromCallable中的函式執行完成後將其結果發射出去。rxjava-async
模組還包含一個fromCallable
操作符,它接受一個Callable
作為引數,返回一個發射這個Callable
的結果的Observable。
final Observable<Integer> observable = Async.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 1;
for(int i=1;i<10;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
result *= i;
}
return result;
}
});
final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1................." + integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
observable.subscribe(subscriber);
執行結果:七、fromRunnable
當run方法執行完成後傳送傳入的引數資料。rxjava-async
模組還包含一個fromRunnable
操作符,它接受一個Runnable
作為引數,返回一個Observable,一旦Runnable終止,它發射這個你傳遞給fromRunnable
的資料。
//當runnable執行完成後所需發射的資料
int result = 20;
Observable<Integer> observable = Async.fromRunnable(new Runnable() {
@Override
public void run() {
for(int i=1;i<10;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},result);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1................." + integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1.....................");
}
};
observable.subscribe(subscriber);
執行結果:
八、forEachFuture
rxjava-async
模組還包含一個forEachFuture
操作符。它其實不算Start
操作符的一個變體,而是有一些自己的特點。你傳遞一些典型的觀察者方法(如onNext, onError和onCompleted)給它,Observable會以通常的方式呼叫它。但是forEachFuture
自己返回一個Future
並且在get()
方法處阻塞,直到原始Observable執行完成,然後它返回,完成還是錯誤依賴於原始Observable是完成還是錯誤。
如果你想要一個函式阻塞直到Observable執行完成,可以使用這個操作符。
private void testForEachFuture() {
Async.forEachFuture(Observable.just(1, 2, 3, 4, 5),
new Action1<Integer>() {
@Override
public void call(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
newInfo(integer);
}
});
}
private void newInfo(Integer integer){
Log.e(TAG, "newInfo.................收到的資料是:" + integer+"..............轉換成:"+integer*20);
}
執行結果:
九、runAsync
任何時候都可以通過subscription的unsubscrip來停止訂閱。rxjava-async
模組還包含一個runAsync
操作符。它很特殊,返回一個叫做StoppableObservable
的特殊Observable。傳遞一個Action
和一個Scheduler
給runAsync
,它返回一個使用這個Action
產生資料的StoppableObservable
。這個Action
接受一個Observable
和一個Subscription
作為引數,它使用Subscription
檢查unsubscribed
條件,一旦發現條件為真就立即停止發射資料。在任何時候你都可以使用unsubscribe
方法手動停止一個StoppableObservable
(這會同時取消訂閱與這個StoppableObservable
關聯的Subscription
)。
由於runAsync
會立即呼叫Action
並開始發射資料,在你建立StoppableObservable之後到你的觀察者準備好接受資料之前這段時間裡,可能會有一部分資料會丟失。如果這不符合你的要求,可以使用runAsync
的一個變體,它也接受一個Subject
引數,傳遞一個ReplaySubject
給它,你可以獲取其它丟失的資料了。在RxJava中還有一個版本的From
操作符可以將Future轉換為Observable,與start
相似。
//action所需要執行的執行緒
Scheduler scheduler = Schedulers.newThread();
Action2<? super Observer<? super Integer>, ? super Subscription> action = new Action2<Observer<? super Integer>, Subscription>() {
@Override
public void call(Observer<? super Integer> observer, Subscription subscription) {
int i = 1;
while (!subscription.isUnsubscribed()){
Log.e(TAG, "call................observer.onNext");
observer.onNext(i);
i += 1;
if(i == 4){
subscription.unsubscribe();
observer.onCompleted();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
StoppableObservable<Integer> stoppableObservable = Async.runAsync(scheduler, action);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext................." + integer);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
stoppableObservable.subscribe(subscriber);
執行結果: