1. 程式人生 > >RxDownload2 原始碼解析(二)

RxDownload2 原始碼解析(二)

原始碼解析,如需轉載,請註明作者:Yuloran (t.cn/EGU6c76)

前言

造輪子者:Season_zlc

本文主要講述 RxDownload2 的執行緒排程

下載任務分發執行緒

顧名思義,就是分發下載任務的執行緒。該執行緒執行在 DownloadService 中,從業務上看,DownloadService應當僅被 start() & bind() 一次。任務分發執行緒,在 onBind() 時建立:

  1. start & bind service [-> RxDownload.java]
    /**
     * start and bind service.
     *
     * @param
callback Called when service connected. */
private void startBindServiceAndDo(final ServiceConnectedCallback callback) { Intent intent = new Intent(context, DownloadService.class); intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber); context.startService(intent); context.bindService(intent, new
ServiceConnection() { @Override public void onServiceConnected(ComponentName name, IBinder binder) { DownloadService.DownloadBinder downloadBinder = (DownloadService.DownloadBinder) binder; downloadService = downloadBinder.getService(); context.unbindService(this
); bound = true; callback.call(); } @Override public void onServiceDisconnected(ComponentName name) { //注意!!這個方法只會在系統殺掉Service時才會呼叫!! bound = false; } }, Context.BIND_AUTO_CREATE); } 複製程式碼

上述程式碼有個細節,onServiceConnected() 中馬上調了 unbindService()

  1. onBind [-> DownloadService.java]
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        log("bind Download Service");
        startDispatch();
        return mBinder;
    }
複製程式碼
  1. startDispatch() [-> DownloadService.java]
    /**
     * start dispatch download queue.
     */
    private void startDispatch() {
        disposable = Observable
                .create(new ObservableOnSubscribe<DownloadMission>() {
                    @Override
                    public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
                        DownloadMission mission;
                        while (!emitter.isDisposed()) {
                            try {
                                log(WAITING_FOR_MISSION_COME);
                                mission = downloadQueue.take();
                                log(Constant.MISSION_COMING);
                            } catch (InterruptedException e) {
                                log("Interrupt blocking queue.");
                                continue;
                            }
                            emitter.onNext(mission);
                        }
                        emitter.onComplete();
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Consumer<DownloadMission>() {
                    @Override
                    public void accept(DownloadMission mission) throws Exception {
                        mission.start(semaphore);
                    }
                });
    }
複製程式碼
  • 上述程式碼就是下載任務分發執行緒的實現。其中 .subscribeOn(Schedulers.newThread()) 表明該執行緒通過 new Thread() 的方式產生的
  • 下載任務用一個阻塞佇列來維護,阻塞佇列內部已經實現了同步機制,所以無需擔心併發問題
  • 只要沒有取消訂閱,該執行緒就會不停的嘗試從阻塞佇列中獲取下載任務併發射。如果佇列為空,就會一直阻塞,直到有新任務入隊
  • 上述程式碼是不嚴謹的,對 disposable 重新賦值前,沒有先嚐試對其取消訂閱。如果多次呼叫 bindService() ,就會出現執行緒洩露

下載任務執行執行緒

顧名思義,就是下載任務的執行執行緒。該執行緒執行在 Schedulers.io() 執行緒池上。入參訊號量用來限制同時下載的最大任務數。

  1. start(final Semaphore semaphore) [-> SingleMission.java]
    @Override
    public void start(final Semaphore semaphore) {
        disposable = start(bean, semaphore, new MissionCallback() {
            @Override
            public void start() {
                // 回撥開始下載
                if (callback != null) callback.start();
            }

            @Override
            public void next(DownloadStatus value) {
                // 回撥下載中
                status = value;
                processor.onNext(started(value));
                if (callback != null) callback.next(value);
            }

            @Override
            public void error(Throwable throwable) {
                // 回撥下載失敗
                processor.onNext(failed(status, throwable));
                if (callback != null) callback.error(throwable);
            }

            @Override
            public void complete() {
                // 回撥下載完成
                processor.onNext(completed(status));
                if (callback != null) callback.complete();
            }
        });
    }
複製程式碼
  1. start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
    protected Disposable start(DownloadBean bean, final Semaphore semaphore,
                               final MissionCallback callback) {
        return rxdownload.download(bean)
                .subscribeOn(Schedulers.io()) // 指定下載任務執行執行緒
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        if (canceled.get()) {
                            dispose(disposable);
                        }

                        log(TRY_TO_ACQUIRE_SEMAPHORE);
                        // 申請訊號量
                        semaphore.acquire();
                        log(ACQUIRE_SUCCESS);
                        
                        // 獲得訊號量後,需再次檢測是否已經暫停下載
                        if (canceled.get()) {
                            // 已經暫停,則取消訂閱,釋放訊號量
                            dispose(disposable);
                        } else {
                            callback.start();
                        }
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        // 取消訂閱時,需要釋放訊號量
                        semaphore.release();
                    }
                })
                .subscribe(new Consumer<DownloadStatus>() {
                    @Override
                    public void accept(DownloadStatus value) throws Exception {
                         // 回撥下載進度
                        callback.next(value);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        // 回撥下載失敗
                        callback.error(throwable);
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        // 回撥下載完成
                        callback.complete();
                    }
                });
    }
複製程式碼

下載任務中斷執行緒

顧名思義,就是中斷下載任務的執行緒。包括暫停、刪除、全部暫停、全部取消四個操作。這些操作也執行在 Schedulers.io() 執行緒池上。

  1. pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
    /**
     * Pause download.
     * <p>
     * Pause a url or all tasks belonging to missionId.
     *
     * @param missionId url or missionId
     */
    public Observable<?> pauseServiceDownload(final String missionId) {
        // createGeneralObservable 是一個非同步繫結下載服務的Observable,通過資源數為1的訊號量實現強制同步
        return createGeneralObservable(new GeneralObservableCallback() {
            @Override
            public void call() {
                // 服務繫結後,呼叫服務的暫停下載
                downloadService.pauseDownload(missionId);
            }
        }).observeOn(AndroidSchedulers.mainThread());
    }
複製程式碼
  1. createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
    /**
     * return general observable
     *
     * @param callback Called when observable created.
     * @return Observable
     */
    private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
        // 方法名起的不好,應該叫 bindService
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                if (!bound) {
                    // 因為 onServiceConnected 是非同步回撥的,所以這裡用了個資源數為1的訊號量實現強制同步(CountDownLatch也可以實現強制同步)
                    semaphore.acquire();
                    if (!bound) {
                        startBindServiceAndDo(new ServiceConnectedCallback() {
                            @Override
                            public void call() {
                                // 服務繫結後,回撥 callback
                                doCall(callback, emitter);
                                // 釋放訊號量
                                semaphore.release();
                            }
                        });
                    } else {
                        doCall(callback, emitter);
                        semaphore.release();
                    }
                } else {
                    doCall(callback, emitter);
                }
            }
        }).subscribeOn(Schedulers.io()); // 指定在 io 執行緒執行,所以暫停下載也是在這個執行緒執行
    }
複製程式碼

同理,刪除下載也會先呼叫 createGeneralObservable(),所以刪除操作也是在 Schedulers.io() 上執行的。

總結

  • 一個單獨的執行緒用來分發下載任務:Schedulers.newThread()
  • 每次從執行緒池中取一個執行緒執行下載任務:Schedulers.io()
  • 每次從執行緒池中取一個執行緒執行暫停、刪除下載任務:Schedulers.io()