1. 程式人生 > >Kotlin、RxJava學習筆記

Kotlin、RxJava學習筆記

關於Kotlin

  • kotlin關鍵字:

     object:定義靜態類
    
     lazy:懶屬性(延遲載入)
    
     when:用於判斷,相當於java中的switch()語句
    
     try{…}catch(){…}:用於捕捉異常
    
     let:預設當前這個物件作為閉包的it引數,返回值是函式裡面最後一行,或者指定return
    
     apply:呼叫某物件的apply函式,在函式範圍內,可以任意呼叫該物件的任意方法,並返回該物件
    
     with:一個單獨的函式,並不是Kotlin中的extension,所以呼叫方式有點不一樣,返回是最後一行,  然後可以直接呼叫物件的方法,感覺像是let和apply的結合
    
     constructor:用於標識次級建構函式,解決過載
    
     init:因為kotlin中的類定義同時也是建構函式,這個時候是不能進行操作的, 所以kotlin增加了一個新的關鍵字init用來處理類的初始化問題,init模組中的內容可以直接使用建構函式的引數
    
     open:由於kotlin中所有類和方法預設都是final的,不能直接繼承或重寫,需要繼承的類或類中要重寫的方法都應當在定義時新增open關鍵字
    
     abstract:描述一個抽象類,抽象類裡的方法如果不提供實現則需要用abstract關鍵字來描述抽象方法.抽象的方法預設是open的
    
     companion object: 伴隨物件, 修飾靜態方法
    
     is:等同於Java中的instanceof
    
    • kotlin靜態程式碼塊寫法(lambda表示式)
    companion object {
        private val TAG = MainActivity.javaClass.name
        init {
            RxJavaPlugins.setErrorHandler { throwable ->
                if (throwable is InterruptedIOException) {
                    Log.d(TAG, "Io interrupted")
                }
            }
        }
    }

關於RxJava

  • 事件流機制

    用兩根水管代替觀察者(產生事件的上游)和被觀察者(接收事件的下游),通過一定方式建立連線

    /**
     * RxJava鏈式操作
     */
    fun connect() {
        //建立一個上游Observable(lambda表示式寫法)
        Observable.create(ObservableOnSubscribe<Int> { emitter ->
            //ObservableEmitter:用來發出事件的,它可以分別發出next、complete和error三種類型的事件
/** * 傳送規則: * <ol> * <li> 上游可以傳送無限個onNext, 下游也可以接收無限個onNext. * <li> 當上遊傳送了一個onComplete後, 上游onComplete之後的事件將會繼續傳送, 而下游收到onComplete事件之後將不再繼續接收事件. * <li> 當上遊傳送了一個onError後, 上游onError之後的事件將繼續傳送, 而下游收到onError事件之後將不再繼續接收事件. * <li> 上游可以不傳送onComplete或onError. * <li> 最為關鍵的是onComplete和onError必須唯一併且互斥(需要程式碼自行控制) * </ol> */ emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() //建立連線 /** * subscribe()有多個過載的方法: * <ol> * <li> public final Disposable subscribe() {} * <li> public final Disposable subscribe(Consumer<? super T> onNext) {} * <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} * <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} * <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} * <li> public final void subscribe(Observer<? super T> observer) {} * <li> 不帶任何引數的subscribe() 表示下游不關心任何事件,你上游儘管發你的資料去吧, 老子可不管你發什麼. * <li> 帶有一個Consumer引數的方法表示下游只關心onNext事件, 其他的事件我假裝沒看見. * </ol> */ }).subscribe(object : Observer<Int> { //建立一個下游 Observer //兩根水管之間的一個開關 private var mDisposable: Disposable? = null private var i: Int = 0 override fun onSubscribe(d: Disposable) { Log.d(TAG, "subscribe") mDisposable = d } override fun onNext(value: Int?) { Log.d(TAG, "next value = $value") i++ if (i == 2) { Log.d(TAG, "dispose") //呼叫dispose()並不會導致上游不再繼續傳送事件, 上游會繼續傳送剩餘的事件 mDisposable!!.dispose() Log.d(TAG, "isDisposed : " + mDisposable!!.isDisposed) } } override fun onError(e: Throwable) { Log.d(TAG, "error") } override fun onComplete() { Log.d(TAG, "complete") } }) }
  • 多執行緒實現事件非同步操作

    在RxJava中, 已經內建了很多執行緒選項供我們選擇, 例如:

    1. Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作
    2. Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
    3. Schedulers.newThread() 代表一個常規的新執行緒
    4. AndroidSchedulers.mainThread() 代表Android的主執行緒
    fun switchThread() {

        val observable = Observable.create(ObservableOnSubscribe<Int> { emitter ->
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().name)
            Log.d(TAG, "emitter 1")
            emitter.onNext(1)
        })

        val consumer = Consumer<Int> { integer ->
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().name)
            Log.d(TAG, "onNext: " + integer!!)
        }
        /**
         * 1. subscribeOn() 指定的是上游傳送事件的執行緒
         * 2. observeOn() 指定的是下游接收事件的執行緒
         * 3. 多次指定上游的執行緒只有第一次指定的有效
         * 4. 多次指定下游的執行緒是可以的, 每呼叫一次observeOn(), 下游的執行緒就會切換一次
         *
         */
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer)
    }
  • Map操作符

    Map是RxJava中最簡單的一個變換操作符,它的作用就是對上游傳送的每一個事件應用一個函式, 使得每一個事件都按照指定的函式去變化。

    1. 通過Map,可以將上游發來的事件轉換為任意的型別,可以是一個Object,也可以是一個集合
    2. FlatMap將一個傳送事件的上游Observable變換為多個傳送事件的Observables,然後將它們發射的事件合併後放進一個單獨的Observable裡,但是不保證事件的順序
    3. concatMap它和flatMap的作用幾乎一模一樣, 只是它的結果是嚴格按照上游傳送的順序來發送的
    /**
     * 巢狀的網路請求, 首先需要去請求註冊, 待註冊成功回調了再去請求登入的介面
     * 登入和註冊返回的都是一個上游Observable, flatMap操作符的作用就是把一個Observable轉換為另一個Observable
     */
    fun nestedNetWork(context: Context) {
        val api = RetrofitProvider.get().create(Api::class.java)
        api.register(RegisterRequestBean("zhangsan", 0)) //發起註冊請求
                .subscribeOn(Schedulers.io())                      //在IO執行緒進行網路請求
                .observeOn(AndroidSchedulers.mainThread())         //回到主執行緒去處理請求註冊結果
                .doOnNext {
                    //先根據註冊的響應結果去做一些操作
                }
                .observeOn(Schedulers.io())                        //回到IO執行緒去發起登入請求
                .flatMap { api.login(LoginRequestBean("zhangsan", 0)) }
                .observeOn(AndroidSchedulers.mainThread())         //回到主執行緒去處理請求登入的結果
                .subscribe({
                    Toast.makeText(context, "登入成功", Toast.LENGTH_SHORT).show()
                }, {
                    Toast.makeText(context, "登入失敗", Toast.LENGTH_SHORT).show()
                })
    }
  • Zip操作符

    Zip通過一個函式將多個Observable傳送的事件結合到一起,然後傳送這些組合到一起的事件。它按照嚴格的順序應用這個函式。它只發射與發射資料項最少的那個Observable一樣多的資料。

    /**
     * 一個介面需要展示使用者的一些資訊, 而這些資訊分別要從兩個伺服器介面中獲取, 而只有當兩個都獲取到了之後才能進行展示
     */
    fun getUserInfo() {
        val api = RetrofitProvider.get().create(Api::class.java)
        val observable1 = api.getUserBaseInfo(UserBaseInfoRequest("zhangsan", 0)).subscribeOn(Schedulers.io())

        val observable2 = api.getUserExtraInfo(UserExtraInfoRequest(24, "man")).subscribeOn(Schedulers.io())

        Observable.zip(observable1, observable2,
                BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo> { baseInfo, extraInfo ->
                    UserInfo(baseInfo, extraInfo)
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    //do something
                }
    }
  • Flowable操作符

    1. 解決上下游流速不均衡常規思路:

      1)從數量上進行治理, 減少傳送進水缸裡的事件,但是過濾事件會導致事件丟失

      2)從速度上進行治理, 減緩事件傳送進水缸的速度,但是減速又可能導致效能損失

    2. 同步和非同步的區別僅僅在於是否有快取池

    3. 大資料流用Flowable,小資料流用Observable
    4. 響應式拉取
    5. request當成下游處理事件的能力, 下游能處理幾個就告訴上游我要幾個
    6. Flowable裡預設有一個大小為128的水缸, 當上下游工作在不同的執行緒中時, 上游就會先把事件傳送到這個水缸中
    7. BackpressureStrategy.BUFFER無大小限制
    8. BackpressureStrategy.ERROR會在出現上下游流速不均衡的時候直接丟擲MissingBackpressureException異常
    9. BackpressureStrategy.DROP直接把存不下的事件丟棄
    10. BackpressureStrategy.LATEST只保留最新的事件
    11. 不是自己建立的Flowable:
      1) onBackpressureBuffer()
      2) onBackpressureDrop()
      3) onBackpressureLatest()
    12. 當上下游在同一個執行緒中的時候,在下游呼叫request(n)就會直接改變上游中的requested的值,多次呼叫便會疊加這個值,而上游每傳送一個事件之後便會去減少這個值,當這個值減少至0的時候,繼續傳送事件便會拋異常了
    13. 當上下游工作在不同的執行緒裡時,每一個執行緒裡都有一個requested,而我們呼叫request()時,實際上改變的是下游主執行緒中的requested,而上游中的requested的值是由RxJava內部呼叫request(n)去設定的,這個呼叫會在合適的時候自動觸發
object FlowableUse {

    private val TAG = FlowableUse.javaClass.name

    var mSubscription: Subscription? = null

    /**
     * 讀取一個文字檔案,需要一行一行讀取,然後處理並輸出,
     * 如果文字檔案很大的時候,比如幾十M的時候,全部先讀入記憶體肯定不是明智的做法,因此我們可以一邊讀取一邊處理
     */
    fun readFile() {
        Flowable.create(FlowableOnSubscribe<String> { emitter ->
            try {
                val reader = FileReader("test.txt")
                val br = BufferedReader(reader)

                val str = br.readLine()
                while (str != null && !emitter.isCancelled) {
                    while (emitter.requested() == 0L) {
                        if (emitter.isCancelled) {
                            break
                        }
                    }
                    emitter.onNext(str)
                }

                br.close()
                reader.close()

                emitter.onComplete()
            } catch (e: Exception) {
                emitter.onError(e)
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(object : Subscriber<String> {

                    override fun onSubscribe(s: Subscription) {
                        mSubscription = s
                        s.request(1)
                    }

                    override fun onNext(string: String) {
                        Log.d(TAG, string)
                        try {
                            Thread.sleep(2000)
                            mSubscription?.request(1)
                        } catch (e: InterruptedException) {
                            e.printStackTrace()
                        }

                    }

                    override fun onError(t: Throwable) {}

                    override fun onComplete() {}
                })
    }
}