1. 程式人生 > >Rxjava - 操作符,線程操作的簡單使用

Rxjava - 操作符,線程操作的簡單使用

ever 個數 sin rri ply mar ++ put time

目錄

  • 創建操作符
    • 10種常用的操作符定義
    • 下面做幾個操作符的demo演示
      • create
      • from
      • repeat
      • defer
      • interval
  • Scheduler
    • 什麽是Scheduler?
    • 如何使用Scheduler

操作符是用來幹什麽的?Rxjava中的每一個操作符基本都是用來創建Observable,也就是被訂閱者。RxJava中常用的操作符包括:創建操作符,連接操作符,工具操作符,變換操作符,過濾操作符,條件操作符,布爾操作符,合並操作符。本次著重了解創建操作符的用法。

創建操作符

10種常用的操作符定義

摘自《RxJava實戰》


just:將一個或多個對象轉換成發射這個或這些對象的一個Observable;

from:將一個Interable,一個Future或者一個數組轉換成一個Observable;

create:使用一個函數從頭創建一個Observable;

defer:只有當訂閱者訂閱才創建Observable,為每個訂閱創建一個新的Observable;

range:創建一個發射指定範圍的整數序列的Observable;

interval:創建一個按照給定的時間間隔發射整數序列的Observable;

timer:創建一個在給定的延時之後發射單個數據的Observable;

empty:創建一個什麽都不做直接通知完成的Observable;

error:創建一個什麽都不做直接通知錯誤的Observable;

never:創建一個不發射任何數據的Observable。

下面做幾個操作符的demo演示

create

此處模擬create中報錯的場景


Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> observableEmitter) throws Exception {
        try {
            if (!observableEmitter.isDisposed()) {
                for (int i = 0; i < 10; i ++) {
                    observableEmitter.onNext(i);
                    String str = null;
                    str.length();
                }
                observableEmitter.onComplete();
            }
        } catch (Exception e) {
            observableEmitter.onError(e);
        }

    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        System.out.println("Error: " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence complete. ");
    }
});

運行結果:
技術分享圖片

just就不寫了,之前寫hello world的時候就用過了。

from

發射Iterable或者數組的每一項數據


Observable.fromArray("hello", "from").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

運行結果:
技術分享圖片

repeat

重復的發射原始Observable的數據序列,次數通過repeat(n)指定


Observable.just("hello repeat")
        .repeat(3)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

運行結果:
技術分享圖片

defer

這裏我們在observable訂閱前睡眠1秒,我們發現只有當observable被訂閱了,發射的“hello defer”這條消息才被打印出來

Observable observable = Observable.defer(new Callable<ObservableSource<?>>() {
    @Override
    public ObservableSource<?> call() throws Exception {
        return Observable.just("hello defer");
    }
});
TimeUnit.SECONDS.sleep(1);
observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String str) throws Exception {
        System.out.println(str);
    }
});

運行結果:
技術分享圖片

interval

interval是按照固定的時間發射一個無限遞增的整數序列

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(aLong);
            }
        });
TimeUnit.SECONDS.sleep(10);

運行結果:
技術分享圖片

Scheduler

什麽是RxJava線程?RxJava是一個為異步線程而實現的庫,所以RxJava的特點就是異步,一個通過異步編程合理提高系統處理速度的。在默認情況下,RxJava是單線程的。用Observable發射數據,Observe接受和響應數據,各種操作符來加工處理數據流,都是在同一個線程中運行的,實現出來的就是一個同步的函數響應式。其實在Observer中接受和響應數據會牽涉到多線程來操作RxJava,這些多線程我們通過調度器(Scheduler)來實現。上述總結於《RxJava實戰》這本書中。

什麽是Scheduler?

Scheduler是RxJava對線程控制器的一個抽象,RxJava內置了多個Scheduler的實現。常用的調度器有single,newThread,computation,io,trampoline,Schedulers.from這五個。當然如果自帶的調度器不能滿足需求,我們是可以自己定義Executor來作為調度器的。

如何使用Scheduler

  • 切換newThread線程
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
        observableEmitter.onNext("hello");
        observableEmitter.onNext("world");
    }
}).observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});
  • 線程調度的兩種方法,也就是線程的切換
通過observeOn或者subscribeOn方法
Observable.just("hello", "world")
        .observeOn(Schedulers.newThread())
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s.toUpperCase();
            }
        })
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

不同線程調度器的使用場景

  • computation()
用於CPU密集型的計算任務,不適合I/O操作
  • io()
用於I/O密集型任務,支持異步阻塞I/O操作,這個調度器的線程池會根據需要增長,對於普通的計算任務,還是用Schedulers.computation()
  • newThread()
為每個任務創建一個新的線程。
  • single()
single擁有一個線程單例,所有的任務都在這一個線程中執行,當此線程中有任務執行時,它的任務將會按照先進先出的順序依次執行。
  • 幾種線程測試demo
Map map = new HashMap();
Observable.just("hello world")
        .subscribeOn(Schedulers.single())
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                s = s.toUpperCase();
                map.put("map1", s);
                return s;
            }
        })
        .observeOn(Schedulers.io())
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                s = s + " leo.";
                map.put("map2", s);
                return s;
            }
        })
        .subscribeOn(Schedulers.computation())
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                s = s + "it is a test";
                map.put("map3", s);
                return s;
            }
        })
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                map.put("subscribe", s);
                System.out.println(s);
            }
        });

總結:操作符,線程操作的使用,大家可以多看看相關書籍,自己多敲代碼,帶著不懂去看源碼或者相關文檔,作為初學者,現在也只是看著書上的皮毛,自己敲著書上的demo來理解。學習之路慢慢,共勉。。。

Rxjava - 操作符,線程操作的簡單使用