1. 程式人生 > >RxJava2在Android中的使用之一 ----- 建立Observable

RxJava2在Android中的使用之一 ----- 建立Observable

前言

既然RxJava是基於觀察者模式,那麼就需要我們操作觀察者(Observer)和被觀察者(Observable),那麼怎麼建立Observable便是第一步。

建立Observable操作符

  • just():建立一個直接發射資料的Observable
  • from():從一個數組或列表中轉換成Observable
  • create():建立一個Observable
  • defer():當訂閱者訂閱時才建立Observable,為每一個訂閱建立一個新的Observable
  • range():建立一個指定範圍的序列Observable
  • interval():建立一個按照規定時間間隔發射資料的Observable
  • timer():延時發射資料的Observable
  • empty():直接完成的Observable
  • error():直接發射錯誤的Observable
  • never():不發射資料的Observable

just()

    // 發射資料
    Observable.just("abc")
              .subscribe({
               JLog.d("onNext:$it")
              }, {
                  JLog.d("onError:$it")
              }, {
                  JLog.d
("onComplete") }, { JLog.d("onSubscribe") })

列印:

 onSubscribe
 onNext:abc
 onComplete

當然,你可以使用Just發射一組資料:

Observable.just("1", "2", "3", "4", "5")
          .subscribe({
               JLog.d("onNext:$it")
           }, {
               JLog.e("onError:$it"
) }, { JLog.d("onComplete") }, { JLog.d("onSubscribe") })

列印:

    onSubscribe
    onNext:1
    onNext:2
    onNext:3
    onNext:4
    onNext:5
    onComplete

From()

當你使用Just發射一組資料時,你會發現它和From很相似:

Observable.fromArray("a", "b", "c")
          .subscribe({
              JLog.d(it)
          })
val items = arrayListOf("1", "2", "3")
Observable.fromIterable(items)
        .subscribe({
            JLog.d(it)
        })

列印:

    a
    b
    c
    1
    2
    3

create()

Observable.create(ObservableOnSubscribe<Int> {
          try
          {
              if (!it.isDisposed)
              {
                  for (i in 0..5)
                  {
                      it.onNext(i)
                  }
                  it.onComplete()
              }
          } catch (e: Exception)
          {
              it.onError(e)
          }
      }).subscribe({
          JLog.d("onNext->$it")
      }, {
          JLog.e("onError->$it")
      }, {
          JLog.d("onComplete")
      }, {
          JLog.d("onSubscribe")
      })      

列印:

    onSubscribe
    onNext->0
    onNext->1
    onNext->2
    onNext->3
    onNext->4
    onNext->5
    onComplete     

Defer()

defer操作符,直到有訂閱者訂閱它的時候,它才會建立一個Observable,以確保獲取的資料是最新的

 var a = 1
 val defer = Observable.defer {
     Observable.just(a)
 }
 val just = Observable.just(a)
 a++
 just.subscribe({ JLog.d("just:" + it.toString()) })
 defer.subscribe({
     JLog.d("defer:" + it.toString())
 })

我在a=1的時候用defer和just分別建立了一個Observable,在a++之後才消費他們,首先看一下結果:

just:1
defer:2

可以看到,just在建立的時候發射資料已經確定,而defer在有訂閱的時候才會建立Observable,保證了a是最新值

Timer()

延時操作符,預設在computation排程器上執行,會在延時一段時間後發射資料

Observable.timer(2, TimeUnit.SECONDS)
         .subscribe(object : Observer<Long>
          {
              override fun onComplete()
              {
              }

              override fun onSubscribe(d: Disposable)
              {
                  JLog.d("onSubscribe")
              }

              override fun onNext(t: Long)
              {
                  JLog.d("onNext")
                  JLog.d("onNext發射結果:$t")
              }

              override fun onError(e: Throwable)
              {
                  JLog.e(e.toString())
              }
          })

列印結果:

10-15 17:40:22.734 19180-19180/com.jzd.jutils.app D/JLog: onSubscribe
10-15 17:40:24.735 19180-19627/com.jzd.jutils.app D/JLog: onNext
            onNext發射結果:0

timer在延時2s後反射了一個0

Interval()

間隔操作符,預設在computation排程器上執行,會在固定時間間隔後發射資料

 Observable.interval(1, TimeUnit.SECONDS)
           .subscribe({ JLog.d(it.toString()) })

列印結果

10-15 17:44:15.022 21811-21902/com.jzd.jutils.app D/JLog: 0
10-15 17:44:16.021 21811-21902/com.jzd.jutils.app D/JLog: 1
10-15 17:44:17.021 21811-21902/com.jzd.jutils.app D/JLog: 2
10-15 17:44:18.021 21811-21902/com.jzd.jutils.app D/JLog: 3
10-15 17:44:19.021 21811-21902/com.jzd.jutils.app D/JLog: 4
10-15 17:44:20.021 21811-21902/com.jzd.jutils.app D/JLog: 5

interval也有3個引數的建立方法interval(long initialDelay, long period, TimeUnit unit)先指定延時時間,然後再輪詢傳送。。。不過,如果你第一次用interval,你會發現這樣寫,interval建立的發射器根本停不下來啊,有木有~坑爹啊。所以正確的操作方式應該是這樣的:

Observable.intervalRange(10, 5, 1, 1, TimeUnit.SECONDS)
          .subscribe({ JLog.d(it.toString()) })

對應的方法就是intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

Repeat

Repeat操作符可以配合建立操作符使用,在建立時配置重複策略
使用repeat重複發射一組資料

Observable.just("1", "2", "3")
           .repeat(2)
           .subscribe({ JLog.d(it) })

列印

    1
    2
    3
    1
    2
    3

使用repeatWhen

Observable.just("1")
           .repeatWhen { Observable.timer(3, TimeUnit.SECONDS) }.subscribe({ JLog.d(it) })
JLog.d("---------------------------")

列印

10-15 19:49:01.646 5814-5814/com.jzd.jutils.app D/JLog: 1
    ---------------------------
10-15 19:49:04.646 5814-6133/com.jzd.jutils.app D/JLog: 1

可以看到3秒後重復發送了一次資料
使用repeatUntil

var count = 0
Observable.just("測試")
        .repeatUntil { count >= 5 }
        .subscribe({
            JLog.d("第${count++}次:$it")
        })

列印結果:

第0次:測試
第1次:測試
第2次:測試
第3次:測試
第4次:測試

可以看到,repeatUntil(BooleanSupplier stop)中,當getAsBoolean返回false時會重複發射資料,當返回true時,會終止發射資料。
常用的建立操作符基本介紹完了,使用這些操作符,我們就可以愉快的建立各式各樣的Observable了。