RxJava RxAndroid RxLifecycle基本使用(使用Kotlin語言)
簡介
RxJava是基於響應式程式設計的框架,響應式程式設計的思想就是一個事件發生後,監聽著這個事件的監聽器馬上做出響應。類似於平常的開關燈。當我們開啟燈的開關時,燈馬上亮了;當我們關閉燈的開關時,燈馬上熄了。這個過程中,燈對我們控制開關的事件做出了響應。在Android中,設定按鈕監聽器也用到了響應式的思想,當有點選事件發生時,OnClickListener馬上執行。也就是說OnClickListener時刻觀察著按鈕,當按鈕被點選時,OnClickListener馬上做出了響應。
RxJava中的三個基本概念:觀察者Observer,被觀察者Observable,訂閱subscribe。當觀察者訂閱了被觀察者,被觀察者有事件發生時,觀察者可以做出響應。
RxAndroid是JakeWharton對RxJava做的一個擴充套件,主要是為了讓Android中更好的使用RxJava
RxJava的Github地址為:https://github.com/ReactiveX/RxJava
RxAndroid的Github地址為:https://github.com/ReactiveX/RxAndroid
使用
1.在app模組的build.gradle的dependencies中引入RxJava和RxAndroid:
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
2.在MainActivity中建立被觀察者和觀察者,然後建立訂閱,例:(本例中佈局非常簡單,只有一個id為text的TextView,故不再給出佈局程式碼)
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,發出整型資料
val observable = createObservable()
//建立一個觀察者,接收整型資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
}
private fun createObservable(): Observable<Int> {
return Observable.create {
for (i in 0..9) {
//通知觀察者執行onNext()方法
it.onNext(i)
}
//通知觀察者資料傳送完成
it.onComplete()
}
}
private fun createObserver(): Observer<Int> {
return object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
}
override fun onNext(integer: Int) {
text.append("$integer\n")
}
override fun onComplete() {
text.append("complete.")
}
override fun onError(e: Throwable) {
text.append(e.message)
}
}
}
}
注:
(1)被觀察者需要重寫subscribe()方法,在此方法中使用emitter發出資料,Kotlin對於單方法單引數的物件有一個語法糖,使用it即可表示這個單引數emitter
(2)觀察者需要重寫onSubscribe()、onNext()、onError()、onComplete()方法。
onSubscribe():訂閱開始時呼叫
onNext():執行發射器發出的事件
onError():當程式出錯時,執行onError,訂閱結束
onComplete():當程式完成後,執行onComplete,訂閱結束
(3)使用observable.subscribe(observer)建立訂閱。subscribeOn(Scheduler scheduler)決定被觀察者發射事件的執行緒,observeOn(Scheduler scheduler)決定觀察者接收事件的執行緒。
常用的執行緒型別有:
Schedulers.newThread():新執行緒
Schedulers.io():IO執行緒
Schedulers.computation():計算執行緒
AndroidSchedulers.mainThread():主執行緒(RxAndroid庫中的)
執行程式,可以看到如下結果:
可以看到observable中發出的事件被observer依次執行了
map操作符
3.使用map在資料傳遞過程中進行資料型別轉換
例:在MainActivity中建立發出整型資料的被觀察者、接收字串資料的觀察者,使用map將整型資料轉換成字串,建立訂閱
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,發出整型資料
val observable = createObservable()
//建立一個觀察者,接收字串資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map{
"string $it"
}
.subscribe(observer)
}
private fun createObservable(): Observable<Int> {
return Observable.create {
for (i in 0..9) {
//通知觀察者執行onNext()方法
it.onNext(i)
}
//通知觀察者資料傳送完成
it.onComplete()
}
}
private fun createObserver(): Observer<String> {
return object : Observer<String> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
}
override fun onNext(string: String) {
text.append("$string\n")
}
override fun onComplete() {
text.append("complete.")
}
override fun onError(e: Throwable) {
text.append(e.message)
}
}
}
}
注:map()函式中需要傳入一個Function<T1,T2>進行型別轉換,Function中需要重寫apply函式,這裡同樣使用了kotlin的語法糖,it表示傳進來的整數,map函式最後一行作為返回值
T2 apply(T1 data):將T1型別的資料傳入,返回T2型別資料,實現資料型別轉換
執行程式,可以看到如下結果:
flatMap操作符和concatMap操作符
4.使用flatMap將發射的一列資料列展開,單獨發射
例:使用flatMap的程式碼如下:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,發出整型資料
val observable = createObservable()
//建立一個觀察者,接收字串資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { origin ->
Observable.create<String> {
//將原始的一列資料展開,單獨用一個被觀察者發射出去
it.onNext("new emitter:$origin")
//當展開的所有被觀察者都完成後,原始被觀察者才會完成
it.onComplete()
}
}
.subscribe(observer)
}
private fun createObservable(): Observable<Int> {
return Observable.create {
for (i in 0..9) {
//通知觀察者執行onNext()方法
it.onNext(i)
}
//通知觀察者資料傳送完成
it.onComplete()
}
}
private fun createObserver(): Observer<String> {
return object : Observer<String> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
}
override fun onNext(string: String) {
text.append("$string\n")
}
override fun onComplete() {
text.append("complete.")
}
override fun onError(e: Throwable) {
text.append(e.message)
}
}
}
}
上面的程式碼原始發射資料是0~9的一列數字,flatMap傳入一個原始資料,輸出一個被觀察者,也就是將單個數字單獨用一個被觀察者發射出去。執行以上程式,顯示如下:
當展開的所有被觀察者都完成後,原始的觀察者才會完成。
flatMap有個類似的方法concatMap,使用上沒有差別,區別在於flatMap展開後是無序的,concatMap展開後是有序的,使用concatMap的話,展開的一列被觀察者將會按照原始資料的順序依次發射
internal操作符
使用internal操作符實現Timer的效果,internal操作傳入三個引數:
initialDelay : 延遲多長時間開始
period : 間隔多長時間
unit : 時間單位
例:
class MainActivity : AppCompatActivity() {
private var disposable: Disposable? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,間隔1秒發射資料
val observable = createObservable()
//建立一個觀察者,接收資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
}
private fun createObservable(): Observable<Long> {
return Observable.interval(0, 1, TimeUnit.SECONDS)
}
private fun createObserver(): Observer<Long> {
return object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
disposable = d
}
override fun onNext(data: Long) {
text.append("$data\n")
if (data >= 9) {
disposable?.dispose()
onComplete()
}
}
override fun onComplete() {
text.append("complete.")
}
override fun onError(e: Throwable) {
text.append(e.message)
}
}
}
}
執行以上程式,顯示如下:
程式碼中可以看出,onSubscribe中的Disposable代表建立的訂閱,需要取消訂閱時,使用disposable.dispose方法即可。如果不取消訂閱,internal的onNext回撥將一直執行,直到當前程式程序被殺死。
由此可以看出,RxJava使用時一定要及時關閉訂閱,否則會導致記憶體洩漏問題,有一個比較方便的框架RxLifecycle,可以幫助我們在Activity生命週期中關閉訂閱
RxLifecycle
RxLifecycle的Github地址為:https://github.com/trello/RxLifecycle
使用
1.在build.gradle中匯入RxLifecycle
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.2'
注:RxLifecycle中包含了RxJava包,所以匯入了RxLifecycle之後就可以不用匯入RxJava了,但是RxLifecycle不包含RxAndroid,所以還是需要匯入RxAndroid
2.我們先寫一個不加入RxLifecycle的訂閱,測試一下RxJava帶來的記憶體洩漏:
class MainActivity : AppCompatActivity() {
companion object {
const val TAG = "MainActivity"
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,間隔1秒發射資料
val observable = createObservable()
//建立一個觀察者,接收資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
}
private fun createObservable(): Observable<Long> {
return Observable.interval(0, 1, TimeUnit.SECONDS)
}
private fun createObserver(): Observer<Long> {
return object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
Log.d(TAG, "onSubscribe")
}
override fun onNext(data: Long) {
text.append("$data\n")
Log.d(TAG, "$data")
}
override fun onComplete() {
text.append("complete.")
Log.d(TAG, "onComplete")
}
override fun onError(e: Throwable) {
text.append(e.message)
Log.d(TAG, "onError")
}
}
}
override fun onDestroy() {
super.onDestroy()
Log.d(TAG, "onDestroy")
}
}
執行程式,可以看到如下Log:
com.example.studyrxjava D/MainActivity: onSubscribe
com.example.studyrxjava D/MainActivity: 0
com.example.studyrxjava D/MainActivity: 1
com.example.studyrxjava D/MainActivity: 2
com.example.studyrxjava D/MainActivity: 3
com.example.studyrxjava D/MainActivity: 4
com.example.studyrxjava D/MainActivity: 5
com.example.studyrxjava D/MainActivity: 6
com.example.studyrxjava D/MainActivity: 7
com.example.studyrxjava D/MainActivity: 8
com.example.studyrxjava D/MainActivity: onDestroy
com.example.studyrxjava D/MainActivity: 9
com.example.studyrxjava D/MainActivity: 10
com.example.studyrxjava D/MainActivity: 11
com.example.studyrxjava D/MainActivity: 12
com.example.studyrxjava D/MainActivity: 13
com.example.studyrxjava D/MainActivity: 14
com.example.studyrxjava D/MainActivity: 15
com.example.studyrxjava D/MainActivity: 16
com.example.studyrxjava D/MainActivity: 17
...
由此可見,在Activity已經onDestroy之後,RxJava的internal訂閱仍在執行,這就是記憶體洩漏了。
3.加上RxLifecycle修復此記憶體洩漏,程式碼如下
class MainActivity : RxAppCompatActivity() {
companion object {
const val TAG = "MainActivity"
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//建立一個被觀察者,間隔1秒發射資料
val observable = createObservable()
//建立一個觀察者,接收資料
val observer = createObserver()
//建立訂閱
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(observer)
}
private fun createObservable(): Observable<Long> {
return Observable.interval(0, 1, TimeUnit.SECONDS)
}
private fun createObserver(): Observer<Long> {
return object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
text.append("subscribe\n")
Log.d(TAG, "onSubscribe")
}
override fun onNext(data: Long) {
text.append("$data\n")
Log.d(TAG, "$data")
}
override fun onComplete() {
text.append("complete.")
Log.d(TAG, "onComplete")
}
override fun onError(e: Throwable) {
text.append(e.message)
Log.d(TAG, "onError")
}
}
}
override fun onDestroy() {
super.onDestroy()
Log.d(TAG, "onDestroy")
}
}
首先將繼承類由AppCompatActivity改為RxAppCompatActivity,這是使用RxLifecycle必需的,然後建立訂閱時加入.compose(bindUntilEvent(ActivityEvent.DESTROY))即可,這行程式碼需要加在subscribe(observer)呼叫之前
執行以上加入了RxLifecycle的程式碼,Log顯示如下:
com.example.studyrxjava D/MainActivity: onSubscribe
com.example.studyrxjava D/MainActivity: 0
com.example.studyrxjava D/MainActivity: 1
com.example.studyrxjava D/MainActivity: 2
com.example.studyrxjava D/MainActivity: 3
com.example.studyrxjava D/MainActivity: 4
com.example.studyrxjava D/MainActivity: 5
com.example.studyrxjava D/MainActivity: 6
com.example.studyrxjava D/MainActivity: onDestroy
com.example.studyrxjava D/MainActivity: onComplete
可以看到,onDestroy之後RxJava的internal訂閱就結束了。
以上,就是RxJava RxAndroid RxLifecycle基本使用。
原始碼已上傳:
https://github.com/wkxjc/StudyRxJava2