1. 程式人生 > >RX之非同步操作(start、toAsync、startFuture、deferFuture、fromAction、fromRunnable、forEachFuture、runAsync)

RX之非同步操作(start、toAsync、startFuture、deferFuture、fromAction、fromRunnable、forEachFuture、runAsync)

需要在buld.gradle中加入compile 'io.reactivex:rxjava-async-util:0.21.0'

一、start

返回一個Observable,它發射一個類似於函式宣告的值。程式語言有很多種方法可以從運算結果中獲取值,它們的名字一般叫functions, futures, actions, callables, runnables等等。在Start目錄下的這組操作符可以讓它們表現得像Observable,因此它們可以在Observables呼叫鏈中與其它Observable搭配使用。Start操作符的多種RxJava實現都屬於可選的rxjava-async模組。

rxjava-async

模組包含start操作符,它接受一個函式作為引數,呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable。

注意:這個函式只會被執行一次,即使多個觀察者訂閱這個返回的Observable。

 Observable<Integer> observable = Async.start(new Func0<Integer>() {
            @Override
            public Integer call() {
                //函式內為非同步操作
                try {
                    Thread.sleep(5 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 20;
            }
        });

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer v) {
                Log.e(TAG, "onNext................." + v);

            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.subscribe(subscriber);
5s後才輸出列印結果
執行結果:


二、toAsync

rxjava-async模組還包含這幾個操作符:toAsyncasyncAction, 和asyncFunc。它們接受一個函式或一個Action作為引數。對於函式(functions),這個操作符呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable(和start一樣)。對於動作(Action),過程類似,但是沒有返回值,在這種情況下,這個操作符在終止前會發射一個null值。

注:這個函式或動作只會被執行一次,即使多個觀察者訂閱這個返回的Observable。

 Func0<Observable<Integer>> func0 = Async.toAsync(new Func0<Integer>() {
            @Override
            public Integer call() {
                //函式內為非同步操作
                try {
                    Thread.sleep(5 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 20;
            }
        });
        Observable<Integer> observable = func0.call();

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer v) {
                Log.e(TAG, "onNext................." + v);

            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };
        observable.subscribe(subscriber);

執行結果:


三、startFuture

xjava-async模組還包含一個startFuture操作符,傳遞給它一個返回Future的函式,startFuture會立即呼叫這個函式獲取Future物件,然後呼叫Futureget()方法嘗試獲取它的值。它返回一個發射這個值給後續觀察者的Observable。

final Future<Integer> future = Executors.newCachedThreadPool().submit(new Task());
        Observable<Integer> observable = Async.startFuture(new Func0<Future<Integer>>() {
            @Override
            public Future<Integer> call() {
                //函式內為非同步操作
                return future;
            }
        });


        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer v) {
                Log.e(TAG, "onNext................." + v);

            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.subscribe(subscriber);

執行結果:

10s後才輸出列印結果


四、deferFuture

deferFuture中的函式可以非同步執行一些操作,當完成後返回一個Observable,但是這個Observable不會立刻發射資料,直到開始訂閱時。 

rxjava-async模組還包含一個deferFuture操作符,傳遞給它一個返回Future的函式(這個Future返回一個Observable),deferFuture返回一個Observable,但是不會呼叫你提供的函式,知道有觀察者訂閱它返回的Observable。這時,它立即呼叫Futureget()方法,然後映象發射get()方法返回的Observable發射的資料。

用這種方法,你可以在Observables呼叫鏈中包含一個返回Observable的Future物件。

 Observable<Long> observable = Observable.interval(1,TimeUnit.SECONDS).take(5);

        final Future<Observable<Integer>> future = Executors.newCachedThreadPool().submit(new ObservableTask());
        final Observable<Integer> observable1 = Async.deferFuture(new Func0<Future<Observable<Integer>>>() {
            @Override
            public Future<Observable<Integer>> call() {
                return future;
            }
        });

        final Subscriber<Integer> subscriber1 = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext1................." + integer);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };

        Subscriber<Long> subscriber = new Subscriber<Long>() {
            @Override
            public void onNext(Long integer) {
                Log.e(TAG, "onNext................." + integer);
                if(integer == 4){
                    observable1.subscribe(subscriber1);
                }
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };


        observable.subscribe(subscriber);

執行結果:


五、fromAction

當fromAction()中的函式Action0執行完成後發射資料。Action無返回結果。rxjava-async模組還包含一個fromAction操作符,它接受一個Action作為引數,返回一個Observable,一旦Action終止,它發射這個你傳遞給fromAction的資料。

        final int result = 1;
        final Observable<Integer> observable = Async.fromAction(new Action0() {
            @Override
            public void call() {

                for(int i=1;i<10;i++){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        },result);

        final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext1................." + integer);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };

        observable.subscribe(subscriber);
執行結果:


六、fromCallable

當fromCallable中的函式執行完成後將其結果發射出去。rxjava-async模組還包含一個fromCallable操作符,它接受一個Callable作為引數,返回一個發射這個Callable的結果的Observable。

 final Observable<Integer> observable = Async.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 1;
                for(int i=1;i<10;i++){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    result *= i;
                }
                return result;
            }
        });

        final Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext1................." + integer);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };

        observable.subscribe(subscriber);
執行結果:


七、fromRunnable

當run方法執行完成後傳送傳入的引數資料。rxjava-async模組還包含一個fromRunnable操作符,它接受一個Runnable作為引數,返回一個Observable,一旦Runnable終止,它發射這個你傳遞給fromRunnable的資料。

        //當runnable執行完成後所需發射的資料
        int result = 20;
         Observable<Integer> observable = Async.fromRunnable(new Runnable() {
             @Override
             public void run() {
                 for(int i=1;i<10;i++){
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             }
         },result);

         Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext1................." + integer);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };

        observable.subscribe(subscriber);
執行結果:


八、forEachFuture

rxjava-async模組還包含一個forEachFuture操作符。它其實不算Start操作符的一個變體,而是有一些自己的特點。你傳遞一些典型的觀察者方法(如onNext, onError和onCompleted)給它,Observable會以通常的方式呼叫它。但是forEachFuture自己返回一個Future並且在get()方法處阻塞,直到原始Observable執行完成,然後它返回,完成還是錯誤依賴於原始Observable是完成還是錯誤。

如果你想要一個函式阻塞直到Observable執行完成,可以使用這個操作符。


private void testForEachFuture() {

        Async.forEachFuture(Observable.just(1, 2, 3, 4, 5),
                new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        newInfo(integer);

                    }
                });

    }


    private void newInfo(Integer integer){
        Log.e(TAG, "newInfo.................收到的資料是:" + integer+"..............轉換成:"+integer*20);
    }
執行結果:



九、runAsync

任何時候都可以通過subscription的unsubscrip來停止訂閱。rxjava-async模組還包含一個runAsync操作符。它很特殊,返回一個叫做StoppableObservable的特殊Observable。傳遞一個Action和一個SchedulerrunAsync,它返回一個使用這個Action產生資料的StoppableObservable。這個Action接受一個Observable和一個Subscription作為引數,它使用Subscription檢查unsubscribed條件,一旦發現條件為真就立即停止發射資料。在任何時候你都可以使用unsubscribe方法手動停止一個StoppableObservable(這會同時取消訂閱與這個StoppableObservable關聯的Subscription)。

由於runAsync會立即呼叫Action並開始發射資料,在你建立StoppableObservable之後到你的觀察者準備好接受資料之前這段時間裡,可能會有一部分資料會丟失。如果這不符合你的要求,可以使用runAsync的一個變體,它也接受一個Subject引數,傳遞一個ReplaySubject給它,你可以獲取其它丟失的資料了。在RxJava中還有一個版本的From操作符可以將Future轉換為Observable,與start相似。

 //action所需要執行的執行緒
        Scheduler scheduler = Schedulers.newThread();
        Action2<? super Observer<? super Integer>, ? super Subscription> action = new Action2<Observer<? super Integer>, Subscription>() {
            @Override
            public void call(Observer<? super Integer> observer, Subscription subscription) {
               int i = 1;
                while (!subscription.isUnsubscribed()){
                    Log.e(TAG, "call................observer.onNext");
                    observer.onNext(i);
                    i += 1;
                    if(i == 4){
                        subscription.unsubscribe();
                        observer.onCompleted();
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        };

       StoppableObservable<Integer> stoppableObservable  =  Async.runAsync(scheduler, action);

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext................." + integer);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        stoppableObservable.subscribe(subscriber);

執行結果: