RxJava學習 - 6. Disposing
RxJava學習 - 6. Disposing
- Handling a Disposable within an Observer
- Using CompositeDisposable
- Handling Disposal with Observable.create()
當你subscribe()一個Observable打算接收emissions的時候,增加了一個流處理這些emissions。當然,這樣做使用了資源。當我們這樣做的時候,我們想處置(dispose of)這些資源,這樣它們能被垃圾回收。有限的Observables呼叫onComplete()就是典型的安全處理辦法。如果你用的是無限的或者長時間執行的Observables,你可能會希望明確地停止emissions,處置訂閱相關的所有內容。事實上,你不再需要的活動的訂閱,垃圾回收器是無能為力的,明確地disposal可以防止記憶體洩漏。
Disposable連結了Observable和活動的Observer,你能呼叫它的dispose()方法停止emissions,處置Observer使用的全部資源。它還有一個isDisposed()方法,
指示它是否已經被處理掉了:
package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
當你提供onNext()、onComplete()和onError()的lambdas作為subscribe()方法的引數,它實際上返回一個Disposable。
你可以使用它,在任何時候,使用它的dispose()方法停止emissions。例如,你能在5秒鐘以後停止接收Observable.interval()的emissions:
import io. reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l));
//sleep 5 seconds
sleep(5000);
//dispose and stop emissions
disposable.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
這裡,我們讓Observable.interval()執行5秒,它有一個Observer,我們儲存了subscribe()方法返回的Disposable。然後我們能呼叫這個Disposable的dispose()方法,停止處理,釋放它使用的任何資源。然後,我們sleep了5秒,證明不再產生emissions。
Handling a Disposable within an Observer
通過onSubscribe()方法,在Observer的實現裡傳遞Disposable。你可以實現自己的Observer,使用onNext()、onComplete()或者onError()訪問Disposable。
它們呼叫dispose(),Observer不再想要emissions:
Observer<Integer> myObserver = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
}
@Override
public void onNext(Integer value) {
//has access to Disposable
}
@Override
public void onError(Throwable e) {
//has access to Disposable
}
@Override
public void onComplete() {
//has access to Disposable
}
};
上面的Disposable從源發射,沿著chain到Observer,所以,chain內的每一步都可以訪問這個Disposable。
注意,把Observer傳給subscribe()方法,是void,不返回Disposable,因為它假設這個Observer會處理它。如果你不想明確地處理這個Disposable,而是讓RxJava處理它(這可能是個好主意,除非你有理由控制它),你能擴充套件ResourceObserver當作你的Observer,它使用預設的Disposable處理。把它傳給subscribeWith(),你會得到預設的Disposable:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> source =
Observable.interval(1, TimeUnit.SECONDS);
ResourceObserver<Long> myObserver = new
ResourceObserver<Long>() {
@Override
public void onNext(Long value) {
System.out.println(value);
}@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
//capture Disposable
Disposable disposable = source.subscribeWith(myObserver);
}
}
Using CompositeDisposable
如果你需要管理幾個訂閱,並處置它們,可以使用CompositeDisposable。它實現了Disposable,在內部儲存了一個disposables的集合,
這樣你能一次處置全部訂閱:
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;
public class Launcher {
private static final CompositeDisposable disposables = new CompositeDisposable();
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
//subscribe and capture disposables
Disposable disposable1 =
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
Disposable disposable2 =
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//put both disposables into CompositeDisposable
disposables.addAll(disposable1, disposable2);
//sleep 5 seconds
sleep(5000);
//dispose all disposables
disposables.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Handling Disposal with Observable.create()
如果你的Observable.create()返回一個長時間執行的,或者無限的Observable,你應該經常檢查ObservableEmitter的isDisposed()方法,看你是否能繼續發射emissions。
下面的例子,你應該使用Observable.range(),但是為了做示例,就在Observable.create()裡迴圈發射整數吧。在發射每個整數前,你應該確保ObservableEmitter指示沒被呼叫disposal:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}
如果你的Observable.create()使用了資源,你應該處理資源的disposal,防止洩漏。ObservableEmitter有setCancellable()和setDisposable()方法。
在我們前面的例子裡,當disposal發生時,應該從ObservableValue刪除ChangeListener。我們能提供一個lambda來setCancellable(),在dispose()前,它將執行下面的動作:
private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable) {
return Observable.create(observableEmitter -> {
//emit initial state
observableEmitter.onNext(fxObservable.getValue());
//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);
//add listener to ObservableValue
fxObservable.addListener(listener);
//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}