RxDownload2 原始碼解析(二)
阿新 • • 發佈:2019-01-04
原始碼解析,如需轉載,請註明作者:Yuloran (t.cn/EGU6c76)
前言
造輪子者:Season_zlc
本文主要講述 RxDownload2
的執行緒排程
下載任務分發執行緒
顧名思義,就是分發下載任務的執行緒。該執行緒執行在 DownloadService
中,從業務上看,DownloadService
應當僅被 start() & bind()
一次。任務分發執行緒,在 onBind()
時建立:
- start & bind service [-> RxDownload.java]
/**
* start and bind service.
*
* @param callback Called when service connected.
*/
private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
Intent intent = new Intent(context, DownloadService.class);
intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
context.startService(intent);
context.bindService(intent, new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder binder) {
DownloadService.DownloadBinder downloadBinder
= (DownloadService.DownloadBinder) binder;
downloadService = downloadBinder.getService();
context.unbindService(this );
bound = true;
callback.call();
}
@Override
public void onServiceDisconnected(ComponentName name) {
//注意!!這個方法只會在系統殺掉Service時才會呼叫!!
bound = false;
}
}, Context.BIND_AUTO_CREATE);
}
複製程式碼
上述程式碼有個細節,onServiceConnected()
中馬上調了 unbindService()
。
- onBind [-> DownloadService.java]
@Nullable
@Override
public IBinder onBind(Intent intent) {
log("bind Download Service");
startDispatch();
return mBinder;
}
複製程式碼
- startDispatch() [-> DownloadService.java]
/**
* start dispatch download queue.
*/
private void startDispatch() {
disposable = Observable
.create(new ObservableOnSubscribe<DownloadMission>() {
@Override
public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
DownloadMission mission;
while (!emitter.isDisposed()) {
try {
log(WAITING_FOR_MISSION_COME);
mission = downloadQueue.take();
log(Constant.MISSION_COMING);
} catch (InterruptedException e) {
log("Interrupt blocking queue.");
continue;
}
emitter.onNext(mission);
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<DownloadMission>() {
@Override
public void accept(DownloadMission mission) throws Exception {
mission.start(semaphore);
}
});
}
複製程式碼
- 上述程式碼就是下載任務分發執行緒的實現。其中
.subscribeOn(Schedulers.newThread())
表明該執行緒通過 new Thread() 的方式產生的 - 下載任務用一個阻塞佇列來維護,阻塞佇列內部已經實現了同步機制,所以無需擔心併發問題
- 只要沒有取消訂閱,該執行緒就會不停的嘗試從阻塞佇列中獲取下載任務併發射。如果佇列為空,就會一直阻塞,直到有新任務入隊
- 上述程式碼是不嚴謹的,對
disposable
重新賦值前,沒有先嚐試對其取消訂閱。如果多次呼叫bindService()
,就會出現執行緒洩露
下載任務執行執行緒
顧名思義,就是下載任務的執行執行緒。該執行緒執行在 Schedulers.io()
執行緒池上。入參訊號量用來限制同時下載的最大任務數。
- start(final Semaphore semaphore) [-> SingleMission.java]
@Override
public void start(final Semaphore semaphore) {
disposable = start(bean, semaphore, new MissionCallback() {
@Override
public void start() {
// 回撥開始下載
if (callback != null) callback.start();
}
@Override
public void next(DownloadStatus value) {
// 回撥下載中
status = value;
processor.onNext(started(value));
if (callback != null) callback.next(value);
}
@Override
public void error(Throwable throwable) {
// 回撥下載失敗
processor.onNext(failed(status, throwable));
if (callback != null) callback.error(throwable);
}
@Override
public void complete() {
// 回撥下載完成
processor.onNext(completed(status));
if (callback != null) callback.complete();
}
});
}
複製程式碼
- start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
protected Disposable start(DownloadBean bean, final Semaphore semaphore,
final MissionCallback callback) {
return rxdownload.download(bean)
.subscribeOn(Schedulers.io()) // 指定下載任務執行執行緒
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (canceled.get()) {
dispose(disposable);
}
log(TRY_TO_ACQUIRE_SEMAPHORE);
// 申請訊號量
semaphore.acquire();
log(ACQUIRE_SUCCESS);
// 獲得訊號量後,需再次檢測是否已經暫停下載
if (canceled.get()) {
// 已經暫停,則取消訂閱,釋放訊號量
dispose(disposable);
} else {
callback.start();
}
}
}, new Action() {
@Override
public void run() throws Exception {
// 取消訂閱時,需要釋放訊號量
semaphore.release();
}
})
.subscribe(new Consumer<DownloadStatus>() {
@Override
public void accept(DownloadStatus value) throws Exception {
// 回撥下載進度
callback.next(value);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 回撥下載失敗
callback.error(throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
// 回撥下載完成
callback.complete();
}
});
}
複製程式碼
下載任務中斷執行緒
顧名思義,就是中斷下載任務的執行緒。包括暫停、刪除、全部暫停、全部取消四個操作。這些操作也執行在 Schedulers.io()
執行緒池上。
- pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
/**
* Pause download.
* <p>
* Pause a url or all tasks belonging to missionId.
*
* @param missionId url or missionId
*/
public Observable<?> pauseServiceDownload(final String missionId) {
// createGeneralObservable 是一個非同步繫結下載服務的Observable,通過資源數為1的訊號量實現強制同步
return createGeneralObservable(new GeneralObservableCallback() {
@Override
public void call() {
// 服務繫結後,呼叫服務的暫停下載
downloadService.pauseDownload(missionId);
}
}).observeOn(AndroidSchedulers.mainThread());
}
複製程式碼
- createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
/**
* return general observable
*
* @param callback Called when observable created.
* @return Observable
*/
private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
// 方法名起的不好,應該叫 bindService
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
if (!bound) {
// 因為 onServiceConnected 是非同步回撥的,所以這裡用了個資源數為1的訊號量實現強制同步(CountDownLatch也可以實現強制同步)
semaphore.acquire();
if (!bound) {
startBindServiceAndDo(new ServiceConnectedCallback() {
@Override
public void call() {
// 服務繫結後,回撥 callback
doCall(callback, emitter);
// 釋放訊號量
semaphore.release();
}
});
} else {
doCall(callback, emitter);
semaphore.release();
}
} else {
doCall(callback, emitter);
}
}
}).subscribeOn(Schedulers.io()); // 指定在 io 執行緒執行,所以暫停下載也是在這個執行緒執行
}
複製程式碼
同理,刪除下載也會先呼叫 createGeneralObservable()
,所以刪除操作也是在 Schedulers.io()
上執行的。
總結
- 一個單獨的執行緒用來分發下載任務:
Schedulers.newThread()
- 每次從執行緒池中取一個執行緒執行下載任務:
Schedulers.io()
- 每次從執行緒池中取一個執行緒執行暫停、刪除下載任務:
Schedulers.io()