Kotlin、RxJava學習筆記
阿新 • • 發佈:2019-02-13
關於Kotlin
kotlin關鍵字:
object:定義靜態類 lazy:懶屬性(延遲載入) when:用於判斷,相當於java中的switch()語句 try{…}catch(){…}:用於捕捉異常 let:預設當前這個物件作為閉包的it引數,返回值是函式裡面最後一行,或者指定return apply:呼叫某物件的apply函式,在函式範圍內,可以任意呼叫該物件的任意方法,並返回該物件 with:一個單獨的函式,並不是Kotlin中的extension,所以呼叫方式有點不一樣,返回是最後一行, 然後可以直接呼叫物件的方法,感覺像是let和apply的結合 constructor:用於標識次級建構函式,解決過載 init:因為kotlin中的類定義同時也是建構函式,這個時候是不能進行操作的, 所以kotlin增加了一個新的關鍵字init用來處理類的初始化問題,init模組中的內容可以直接使用建構函式的引數 open:由於kotlin中所有類和方法預設都是final的,不能直接繼承或重寫,需要繼承的類或類中要重寫的方法都應當在定義時新增open關鍵字 abstract:描述一個抽象類,抽象類裡的方法如果不提供實現則需要用abstract關鍵字來描述抽象方法.抽象的方法預設是open的 companion object: 伴隨物件, 修飾靜態方法 is:等同於Java中的instanceof
- kotlin靜態程式碼塊寫法(lambda表示式)
companion object {
private val TAG = MainActivity.javaClass.name
init {
RxJavaPlugins.setErrorHandler { throwable ->
if (throwable is InterruptedIOException) {
Log.d(TAG, "Io interrupted")
}
}
}
}
關於RxJava
事件流機制
用兩根水管代替觀察者(產生事件的上游)和被觀察者(接收事件的下游),通過一定方式建立連線
/**
* RxJava鏈式操作
*/
fun connect() {
//建立一個上游Observable(lambda表示式寫法)
Observable.create(ObservableOnSubscribe<Int> { emitter ->
//ObservableEmitter:用來發出事件的,它可以分別發出next、complete和error三種類型的事件
/**
* 傳送規則:
* <ol>
* <li> 上游可以傳送無限個onNext, 下游也可以接收無限個onNext.
* <li> 當上遊傳送了一個onComplete後, 上游onComplete之後的事件將會繼續傳送, 而下游收到onComplete事件之後將不再繼續接收事件.
* <li> 當上遊傳送了一個onError後, 上游onError之後的事件將繼續傳送, 而下游收到onError事件之後將不再繼續接收事件.
* <li> 上游可以不傳送onComplete或onError.
* <li> 最為關鍵的是onComplete和onError必須唯一併且互斥(需要程式碼自行控制)
* </ol>
*/
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
//建立連線
/**
* subscribe()有多個過載的方法:
* <ol>
* <li> public final Disposable subscribe() {}
* <li> public final Disposable subscribe(Consumer<? super T> onNext) {}
* <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
* <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
* <li> public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
* <li> public final void subscribe(Observer<? super T> observer) {}
* <li> 不帶任何引數的subscribe() 表示下游不關心任何事件,你上游儘管發你的資料去吧, 老子可不管你發什麼.
* <li> 帶有一個Consumer引數的方法表示下游只關心onNext事件, 其他的事件我假裝沒看見.
* </ol>
*/
}).subscribe(object : Observer<Int> {
//建立一個下游 Observer
//兩根水管之間的一個開關
private var mDisposable: Disposable? = null
private var i: Int = 0
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "subscribe")
mDisposable = d
}
override fun onNext(value: Int?) {
Log.d(TAG, "next value = $value")
i++
if (i == 2) {
Log.d(TAG, "dispose")
//呼叫dispose()並不會導致上游不再繼續傳送事件, 上游會繼續傳送剩餘的事件
mDisposable!!.dispose()
Log.d(TAG, "isDisposed : " + mDisposable!!.isDisposed)
}
}
override fun onError(e: Throwable) {
Log.d(TAG, "error")
}
override fun onComplete() {
Log.d(TAG, "complete")
}
})
}
多執行緒實現事件非同步操作
在RxJava中, 已經內建了很多執行緒選項供我們選擇, 例如:
- Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
- Schedulers.newThread() 代表一個常規的新執行緒
- AndroidSchedulers.mainThread() 代表Android的主執行緒
fun switchThread() {
val observable = Observable.create(ObservableOnSubscribe<Int> { emitter ->
Log.d(TAG, "Observable thread is : " + Thread.currentThread().name)
Log.d(TAG, "emitter 1")
emitter.onNext(1)
})
val consumer = Consumer<Int> { integer ->
Log.d(TAG, "Observer thread is :" + Thread.currentThread().name)
Log.d(TAG, "onNext: " + integer!!)
}
/**
* 1. subscribeOn() 指定的是上游傳送事件的執行緒
* 2. observeOn() 指定的是下游接收事件的執行緒
* 3. 多次指定上游的執行緒只有第一次指定的有效
* 4. 多次指定下游的執行緒是可以的, 每呼叫一次observeOn(), 下游的執行緒就會切換一次
*
*/
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer)
}
Map操作符
Map是RxJava中最簡單的一個變換操作符,它的作用就是對上游傳送的每一個事件應用一個函式, 使得每一個事件都按照指定的函式去變化。
- 通過Map,可以將上游發來的事件轉換為任意的型別,可以是一個Object,也可以是一個集合
- FlatMap將一個傳送事件的上游Observable變換為多個傳送事件的Observables,然後將它們發射的事件合併後放進一個單獨的Observable裡,但是不保證事件的順序
- concatMap它和flatMap的作用幾乎一模一樣, 只是它的結果是嚴格按照上游傳送的順序來發送的
/**
* 巢狀的網路請求, 首先需要去請求註冊, 待註冊成功回調了再去請求登入的介面
* 登入和註冊返回的都是一個上游Observable, flatMap操作符的作用就是把一個Observable轉換為另一個Observable
*/
fun nestedNetWork(context: Context) {
val api = RetrofitProvider.get().create(Api::class.java)
api.register(RegisterRequestBean("zhangsan", 0)) //發起註冊請求
.subscribeOn(Schedulers.io()) //在IO執行緒進行網路請求
.observeOn(AndroidSchedulers.mainThread()) //回到主執行緒去處理請求註冊結果
.doOnNext {
//先根據註冊的響應結果去做一些操作
}
.observeOn(Schedulers.io()) //回到IO執行緒去發起登入請求
.flatMap { api.login(LoginRequestBean("zhangsan", 0)) }
.observeOn(AndroidSchedulers.mainThread()) //回到主執行緒去處理請求登入的結果
.subscribe({
Toast.makeText(context, "登入成功", Toast.LENGTH_SHORT).show()
}, {
Toast.makeText(context, "登入失敗", Toast.LENGTH_SHORT).show()
})
}
Zip操作符
Zip通過一個函式將多個Observable傳送的事件結合到一起,然後傳送這些組合到一起的事件。它按照嚴格的順序應用這個函式。它只發射與發射資料項最少的那個Observable一樣多的資料。
/**
* 一個介面需要展示使用者的一些資訊, 而這些資訊分別要從兩個伺服器介面中獲取, 而只有當兩個都獲取到了之後才能進行展示
*/
fun getUserInfo() {
val api = RetrofitProvider.get().create(Api::class.java)
val observable1 = api.getUserBaseInfo(UserBaseInfoRequest("zhangsan", 0)).subscribeOn(Schedulers.io())
val observable2 = api.getUserExtraInfo(UserExtraInfoRequest(24, "man")).subscribeOn(Schedulers.io())
Observable.zip(observable1, observable2,
BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo> { baseInfo, extraInfo ->
UserInfo(baseInfo, extraInfo)
}).observeOn(AndroidSchedulers.mainThread())
.subscribe {
//do something
}
}
Flowable操作符
解決上下游流速不均衡常規思路:
1)從數量上進行治理, 減少傳送進水缸裡的事件,但是過濾事件會導致事件丟失
2)從速度上進行治理, 減緩事件傳送進水缸的速度,但是減速又可能導致效能損失
同步和非同步的區別僅僅在於是否有快取池
- 大資料流用Flowable,小資料流用Observable
- 響應式拉取
- request當成下游處理事件的能力, 下游能處理幾個就告訴上游我要幾個
- Flowable裡預設有一個大小為128的水缸, 當上下游工作在不同的執行緒中時, 上游就會先把事件傳送到這個水缸中
- BackpressureStrategy.BUFFER無大小限制
- BackpressureStrategy.ERROR會在出現上下游流速不均衡的時候直接丟擲MissingBackpressureException異常
- BackpressureStrategy.DROP直接把存不下的事件丟棄
- BackpressureStrategy.LATEST只保留最新的事件
- 不是自己建立的Flowable:
1) onBackpressureBuffer()
2) onBackpressureDrop()
3) onBackpressureLatest() - 當上下游在同一個執行緒中的時候,在下游呼叫request(n)就會直接改變上游中的requested的值,多次呼叫便會疊加這個值,而上游每傳送一個事件之後便會去減少這個值,當這個值減少至0的時候,繼續傳送事件便會拋異常了
- 當上下游工作在不同的執行緒裡時,每一個執行緒裡都有一個requested,而我們呼叫request()時,實際上改變的是下游主執行緒中的requested,而上游中的requested的值是由RxJava內部呼叫request(n)去設定的,這個呼叫會在合適的時候自動觸發
object FlowableUse {
private val TAG = FlowableUse.javaClass.name
var mSubscription: Subscription? = null
/**
* 讀取一個文字檔案,需要一行一行讀取,然後處理並輸出,
* 如果文字檔案很大的時候,比如幾十M的時候,全部先讀入記憶體肯定不是明智的做法,因此我們可以一邊讀取一邊處理
*/
fun readFile() {
Flowable.create(FlowableOnSubscribe<String> { emitter ->
try {
val reader = FileReader("test.txt")
val br = BufferedReader(reader)
val str = br.readLine()
while (str != null && !emitter.isCancelled) {
while (emitter.requested() == 0L) {
if (emitter.isCancelled) {
break
}
}
emitter.onNext(str)
}
br.close()
reader.close()
emitter.onComplete()
} catch (e: Exception) {
emitter.onError(e)
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(object : Subscriber<String> {
override fun onSubscribe(s: Subscription) {
mSubscription = s
s.request(1)
}
override fun onNext(string: String) {
Log.d(TAG, string)
try {
Thread.sleep(2000)
mSubscription?.request(1)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
override fun onError(t: Throwable) {}
override fun onComplete() {}
})
}
}