1. 程式人生 > >2018-7-20-Hello-RxJava

2018-7-20-Hello-RxJava

個人網站:https://tinuv.me/
更好的閱讀體驗請訪問:https://tinuv.me/2018/07/02/178.html

介紹

RxJava是JVM的響應式擴充套件,其實我現在也不知道它是什麼意思,因為我現在也沒有用過響應式程式設計,它提到了JVM,我也不知道它具體根JVM有多大的聯絡,它還提到了觀察者模式的設計模式,擴充套件的觀察者模式(It extends the observer pattern),這個我有一點了解.作為認識的一部分,我先擺在這裡.

官方的介紹是這樣的:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻譯成中文:

一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫

觀察者模式

RxJava使用了擴充套件的觀察者模式,與普通的觀察者模式不同,最大的不同在於普通的觀察者模式一次釋出一個內容,而RxJava會維護一個佇列,每出隊一次分發一個事件,觀察者接受和處理一個事件,當然也有一定的限制,限制如下:

  • 當發射器呼叫onComplete()函式時(可以看作是特定訊號),被觀察者繼續發射事件但觀察者不會接收.
  • 被觀察者傳遞一個Disposable(通過介面),當呼叫這個物件的Disposable.dispose()函式時,發射器停止發射事件

用RxJava實現通用的觀察者模式

值得注意的地方:

  • RxJava還依賴了Reactive Streams,如果找不到這個包會報ClassNotFoundExpection
  • 通過繼承Observable<T>的方式擴充套件被觀察者的時候(此示例使用這種方式),註冊,新增或者說註冊訂閱者和釋出訊息使用的是同一個函式,這是結構所決定的
  • 使用第二點所說的方式實現通用的觀察者模式似乎不能取消訂閱
被觀察者
import io.reactivex.Observable;
import io.reactivex.Observer;

public
class MyObservable extends Observable<String> { @Override protected void subscribeActual(Observer<? super String> observer) { observer.onNext("test1"); observer.onNext("test2"); observer.onNext("test3"); observer.onComplete(); } }
觀察者
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MyObserver implements Observer<String> {
    @Override
    public void onSubscribe(Disposable disposable) {

    }

    @Override
    public void onNext(String s) {
        System.out.println("MyObsrever receiver"+s);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {
        System.out.println("MyObsrever receiver complete");
    }
}
主函式
public class Main {
    public static void main(String[] arg){
        MyObservable observable = new MyObservable();
        MyObserver observer = new MyObserver();
        MyObserver2 observer2 = new MyObserver2();
        observable.subscribeActual(observer);
        observable.subscribeActual(observer2);
    }
}

可以看到跟我自己實現的觀察者模式來比是差不多的

另一種方式

被觀察者
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;

public class MyObservable implements ObservableOnSubscribe<String> {
    
    @Override
    public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
        observableEmitter.onNext("test1");
        observableEmitter.onNext("test2");
        observableEmitter.onNext("test3");
        observableEmitter.onComplete();
    }
}
觀察者
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class MyObserver implements Observer<String> {
    @Override
    public void onSubscribe(Disposable disposable) {

    }

    @Override
    public void onNext(String s) {
        System.out.println("MyObsrever receiver"+s);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {
        System.out.println("MyObsrever receiver complete");
    }
}

主函式

import io.reactivex.Observable;

public class Main {
    public static void main(String[] arg){
        MyObserver observer = new MyObserver();
        MyObserver2 observer2 = new MyObserver2();
        Observable<String> observable = Observable.create(new MyObservable());
        observable.subscribe(observer);
        observable.subscribe(observer2);
    }
}

鏈式呼叫

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class Main {
    public static void main(String[] arg){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) {
                observableEmitter.onNext("test1");
                observableEmitter.onNext("test2");
                observableEmitter.onNext("test3");
                observableEmitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("receive message  "+s);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                System.out.println("receive complete!");
            }
        });
    }
}

小結

Rxjava當然不是隻是實現觀察者模式這麼簡單,但觀察者模式的的確確是它很重要的一個特徵.