1. 程式人生 > >RxJava學習 - 6. Disposing

RxJava學習 - 6. Disposing

RxJava學習 - 6. Disposing

當你subscribe()一個Observable打算接收emissions的時候,增加了一個流處理這些emissions。當然,這樣做使用了資源。當我們這樣做的時候,我們想處置(dispose of)這些資源,這樣它們能被垃圾回收。有限的Observables呼叫onComplete()就是典型的安全處理辦法。如果你用的是無限的或者長時間執行的Observables,你可能會希望明確地停止emissions,處置訂閱相關的所有內容。事實上,你不再需要的活動的訂閱,垃圾回收器是無能為力的,明確地disposal可以防止記憶體洩漏。
Disposable連結了Observable和活動的Observer,你能呼叫它的dispose()方法停止emissions,處置Observer使用的全部資源。它還有一個isDisposed()方法,
指示它是否已經被處理掉了:

package io.reactivex.disposables;

public interface Disposable {
    void dispose();
    boolean isDisposed();
}

當你提供onNext()、onComplete()和onError()的lambdas作為subscribe()方法的引數,它實際上返回一個Disposable。
你可以使用它,在任何時候,使用它的dispose()方法停止emissions。例如,你能在5秒鐘以後停止接收Observable.interval()的emissions:

import io.
reactivex.Observable; import io.reactivex.disposables.Disposable; import java.util.concurrent.TimeUnit; public class Launcher { public static void main(String[] args) { Observable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS); Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l)); //sleep 5 seconds sleep(5000); //dispose and stop emissions disposable.dispose(); //sleep 5 seconds to prove //there are no more emissions sleep(5000); } public static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }

這裡,我們讓Observable.interval()執行5秒,它有一個Observer,我們儲存了subscribe()方法返回的Disposable。然後我們能呼叫這個Disposable的dispose()方法,停止處理,釋放它使用的任何資源。然後,我們sleep了5秒,證明不再產生emissions。

Handling a Disposable within an Observer

通過onSubscribe()方法,在Observer的實現裡傳遞Disposable。你可以實現自己的Observer,使用onNext()、onComplete()或者onError()訪問Disposable。
它們呼叫dispose(),Observer不再想要emissions:

Observer<Integer> myObserver = new Observer<Integer>() {
    private Disposable disposable;
    
    @Override
    public void onSubscribe(Disposable disposable) {
        this.disposable = disposable;
    }
    @Override
    public void onNext(Integer value) {
        //has access to Disposable
    }
    @Override
    public void onError(Throwable e) {
        //has access to Disposable
    }
    @Override
    public void onComplete() {
        //has access to Disposable
    }
};

上面的Disposable從源發射,沿著chain到Observer,所以,chain內的每一步都可以訪問這個Disposable。
注意,把Observer傳給subscribe()方法,是void,不返回Disposable,因為它假設這個Observer會處理它。如果你不想明確地處理這個Disposable,而是讓RxJava處理它(這可能是個好主意,除非你有理由控制它),你能擴充套件ResourceObserver當作你的Observer,它使用預設的Disposable處理。把它傳給subscribeWith(),你會得到預設的Disposable:

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> source =
                Observable.interval(1, TimeUnit.SECONDS);
        ResourceObserver<Long> myObserver = new
                ResourceObserver<Long>() {
                    @Override
                    public void onNext(Long value) {
                        System.out.println(value);
                    }@Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                };
        //capture Disposable
        Disposable disposable = source.subscribeWith(myObserver);        
    }
}

Using CompositeDisposable

如果你需要管理幾個訂閱,並處置它們,可以使用CompositeDisposable。它實現了Disposable,在內部儲存了一個disposables的集合,
這樣你能一次處置全部訂閱:

import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    
    private static final CompositeDisposable disposables = new CompositeDisposable();
    
    public static void main(String[] args) {
        Observable<Long> seconds =
                Observable.interval(1, TimeUnit.SECONDS);
        //subscribe and capture disposables
        Disposable disposable1 =
                seconds.subscribe(l -> System.out.println("Observer 1: " + l));
        Disposable disposable2 =
                seconds.subscribe(l -> System.out.println("Observer 2: " + l));
        //put both disposables into CompositeDisposable
        disposables.addAll(disposable1, disposable2);
        //sleep 5 seconds
        sleep(5000);
        //dispose all disposables
        disposables.dispose();
        //sleep 5 seconds to prove
        //there are no more emissions
        sleep(5000);    
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }     
}

Handling Disposal with Observable.create()

如果你的Observable.create()返回一個長時間執行的,或者無限的Observable,你應該經常檢查ObservableEmitter的isDisposed()方法,看你是否能繼續發射emissions。
下面的例子,你應該使用Observable.range(),但是為了做示例,就在Observable.create()裡迴圈發射整數吧。在發射每個整數前,你應該確保ObservableEmitter指示沒被呼叫disposal:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> source =
                Observable.create(observableEmitter -> {
                    try {
                        for (int i = 0; i < 1000; i++) {
                            while (!observableEmitter.isDisposed()) {
                                observableEmitter.onNext(i);
                            }
                            if (observableEmitter.isDisposed())
                                return;
                        }
                        observableEmitter.onComplete();
                    } catch (Throwable e) {
                        observableEmitter.onError(e);
                    }
                });        
    }
}

如果你的Observable.create()使用了資源,你應該處理資源的disposal,防止洩漏。ObservableEmitter有setCancellable()和setDisposable()方法。
在我們前面的例子裡,當disposal發生時,應該從ObservableValue刪除ChangeListener。我們能提供一個lambda來setCancellable(),在dispose()前,它將執行下面的動作:

    private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable) {
        return Observable.create(observableEmitter -> {
            //emit initial state
            observableEmitter.onNext(fxObservable.getValue());
            //emit value changes uses a listener
            final ChangeListener<T> listener =
                    (observableValue, prev, current) ->
                            observableEmitter.onNext(current);
            //add listener to ObservableValue
            fxObservable.addListener(listener);
            //Handle disposing by specifying cancellable
            observableEmitter.setCancellable(() ->
                    fxObservable.removeListener(listener));
        });
    }