1. 程式人生 > >一起來造一個RxJava,揭祕RxJava的實現原理

一起來造一個RxJava,揭祕RxJava的實現原理

RxJava是一個神奇的框架,用法很簡單,但內部實現有點複雜,程式碼邏輯有點繞。我讀原始碼時,確實有點似懂非懂的感覺。網上關於RxJava原始碼分析的文章,原始碼貼了一大堆,程式碼邏輯繞來繞去的,讓人看得雲裡霧裡的。既然用拆輪子的方式來分析原始碼比較難啃,不如換種方式,以造輪子的方式,將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除,留下核心程式碼帶大家揭祕RxJava的實現原理。

什麼是響應式程式設計

首先,我們需要明確,RxJava是Reactive Programming在Java中的一種實現。什麼是響應式程式設計? 
用一個字來概括就是流(Stream)。Stream 就是一個按時間排序的 Events 序列

,它可以放射三種不同的 Events:(某種型別的)Value、Error 或者一個” Completed” Signal。通過分別為 Value、Error、”Completed”定義事件處理函式,我們將會非同步地捕獲這些 Events。基於觀察者模式,事件流將從上往下,從訂閱源傳遞到觀察者。

至於使用Rx框架的優點,它可以避免回撥巢狀,更優雅地切換執行緒實現非同步處理資料。配合一些操作符,可以讓處理事件流的程式碼更加簡潔,邏輯更加清晰。

搭建大體的框架

要造一座房子,首先要把大體的框架搭好。在RxJava裡面,有兩個必不可少的角色:Subscriber(觀察者) 和 Observable(訂閱源)。

Subscriber(觀察者)

Subsribler在RxJava裡面是一個抽象類,它實現了Observer介面。

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable t);
    void onNext(T var1);
}

為了儘可能的簡單,將Subscriber簡化如下:

public abstract class Subscriber<T> implements Observer<T> {
    public void onStart() {
    }
}


Observable(訂閱源)

Observable(訂閱源)在RxJava裡面是一個大而雜的類,擁有很多工廠方法和各式各樣的操作符。每個Observable裡面有一個OnSubscribe物件,只有一個方法(void call(Subscriber<? super T> subscriber);),用來產生資料流,這是典型的命令模式。

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;

    private Observable(OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
        return new Observable<T>(onSubscribe);
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
    }

    public interface OnSubscribe<T> {
        void call(Subscriber<? super T> subscriber);
    }
}

實踐

到此,一個小型的RxJava的雛形就出來了。不信?我們來實踐一下吧

 Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 10; i++) {
                    subscriber.onNext(i);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }
            @Override
            public void onError(Throwable t) {

            }
            @Override
            public void onNext(Integer var1) {
                System.out.println(var1);
            }
        });

新增操作符

其實,強大的RxJava的核心原理並沒有想象中那麼複雜和神祕,運用的就是典型的觀察者模式。有了基本雛形之後,我們繼續為這個框架添磚加瓦吧。RxJava之所以強大好用,與其擁有豐富靈活的操作符是分不開的。那麼我們就試著為這個框架新增一個最常用的操作符:map。

那麼RxJava是如何實現操作符的呢?其實,每呼叫一次操作符的方法,就相當於在上層資料來源和下層觀察者之間橋接了一個新的Observable。橋接的Observable內部會例項化有新的OnSuscribe和Subscriber。OnSuscribe負責接受目標Subscriber傳來的訂閱請求,並呼叫源Observable.OnSubscribe的subscribe方法。源Observable.OnSubscribe將Event往下發送給橋接Observable.Subscriber,最終橋接Observable.Subscriber將Event做相應處理後轉發給目標Subscriber。流程如下圖所示:

這裡寫圖片描述

接著,我們用程式碼實現這一過程。在Observable類裡面新增如下程式碼:

public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {
        return create(new OnSubscribe<R>() { // 生成一個橋接的Observable和 OnSubscribe
            @Override
            public void call(Subscriber<? super R> subscriber) {
                Observable.this.subscribe(new Subscriber<T>() { // 訂閱上層的Observable
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }
                    @Override
                    public void onError(Throwable t) {
                        subscriber.onError(t);
                    }
                    @Override
                    public void onNext(T var1) {
                        // 將上層的onSubscribe傳送過來的Event,通過轉換和處理,轉發給目標的subscriber
                        subscriber.onNext(transformer.call(var1));
                    }
                });
            }
        });
    }
    public interface Transformer<T, R> {
        R call(T from);
    }

map操作符的作用是將T型別的Event轉化成R型別,轉化策略抽象成Transformer<T, R>(RxJava中用的是Func1<T, R>,但為了便於理解,起了一個有意義的名字)這一個函式介面,由外部傳入。

上面程式碼中使用到一些泛型的萬用字元,有些地方使用了super,有些地方使用了extends,其實這是有講究的,傳給Transformer#call方法的引數是T型別的,那麼call方法的引數型別可以宣告成是T的父類,Transformer#call方法的返回值要求是R型別的,那麼它的返回值型別應該宣告成R的子類。如果大家不能理解,也可以不用在意這些細節。

那麼我們一起來測試一下吧。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 10; i++) {
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Transformer<Integer, String>() {
            @Override
            public String call(Integer from) {
                return "maping " + from;
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String var1) {
                System.out.println(var1);
            }
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable t) {}
        });

但是,我們看到map()方法內內部類有點多,程式碼缺少拓展性和可讀性,我們應該進行適當地重構,將主要的邏輯抽離成獨立的模組,並保證模組間儘量解耦,否則Observable只會越來越臃腫。

public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {
        return create(new MapOnSubscribe<T, R>(this, transformer));
    }
public class MapOnSubscribe<T, R> implements Observable.OnSubscribe<R> {
    final Observable<T> source;
    final Observable.Transformer<? super T, ? extends R> transformer;
    public MapOnSubscribe(Observable<T> source, Observable.Transformer<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    @Override
    public void call(Subscriber<? super R> subscriber) {
        source.subscribe(new MapSubscriber<R, T>(subscriber, transformer));
    }
}
public class MapSubscriber<T, R> extends Subscriber<R> {
    final Subscriber<? super T> actual;
    final Observable.Transformer<? super R, ? extends T> transformer;
    public MapSubscriber(Subscriber<? super T> actual, Observable.Transformer<? super R, ? extends T> transformer) {
        this.actual = actual;
        this.transformer = transformer;
    }
    @Override
    public void onCompleted() {
        actual.onCompleted();
    }
    @Override
    public void onError(Throwable t) {
        actual.onError(t);
    }
    @Override
    public void onNext(R var1) {
        actual.onNext(transformer.call(var1));
    }
}

新增執行緒切換功能

RxJava中最激動人心的功能是非同步處理,能夠自如地切換執行緒。利用 subscribeOn() 結合 observeOn() 來實現執行緒控制,讓事件的產生和消費發生在不同的執行緒。 observeOn() 可以多次呼叫,實現了執行緒的多次切換,最終目標Subscriber的執行執行緒與最後一次observeOn()的呼叫有關。但subscribeOn() 多次呼叫只有第一個subscribeOn() 起作用。為什麼呢?因為 observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe。

這裡借用扔物線的圖:

這裡寫圖片描述

簡單地呼叫一個方法就可以完成執行緒的切換,很神奇對吧。RxJava是如何實現的呢?除了橋接Observable以外,RxJava還用到一個很關鍵的類—Scheduler(排程器)。文件中給Scheduler的定義是:A Scheduler is an object that schedules units of work.,也就是進行任務的排程的一個東西。Scheduler裡面有一個重要的抽象方法:

public abstract Worker createWorker();

Worker是Scheduler的內部類,它是具體任務的執行者。當要提交任務給Worker執行需要呼叫Worker的schedule(Action0 aciton)方法。

public abstract Subscription schedule(Action0 action);

要獲得一個Scheduler並不需要我們去new,一般是呼叫Schedulers的工廠方法。

public final class Schedulers {
    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;
    public static Scheduler io() {
        return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
    }
    public static Scheduler computation() {
        return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);
    }
  ...
}

具體的Scheduler的實現類就不帶大家一起看了,但我們需要知道,能做到執行緒切換的關鍵Worker的schedule方法,因為它會把傳過來的任務放入執行緒池,或新執行緒中執行,這取決於具體Scheduler的實現。

自定義Scheduler

那麼,下面我們先來自定義一個簡單的Scheduler和Worker。

public class Scheduler {
    final Executor executor;
    public Scheduler(Executor executor) {
        this.executor = executor;
    }
    public Worker createWorker() {
        return new Worker(executor);
    }
    public static class Worker {
        final Executor executor;
        public Worker(Executor executor) {
            this.executor = executor;
        }
      // 這裡接受的是Runnable而不是Action0,其實這沒什麼關係,主要是懶得自定義函式式介面了。
        public void schedule(Runnable runnable) {
            executor.execute(runnable);
        }
    }
}

為了達到高仿效果,我們也提供相應的工廠方法。

public class Schedulers {
    private static final Scheduler ioScheduler = new Scheduler(Executors.newSingleThreadExecutor());
    public static Scheduler io() {
        return ioScheduler;
    }
}

實現subscribeOn

subscribeOn是作用於上層OnSubscribe的,可以讓OnSubscribe的call方法在新執行緒中執行。

因此,在Observable類裡面,新增如下程式碼:

public Observable<T> subscribeOn(Scheduler scheduler) {
        return Observable.create(new OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                subscriber.onStart();
                // 將事件的生產切換到新的執行緒。
                scheduler.createWorker().schedule(new Runnable() {
                    @Override
                    public void run() {
                        Observable.this.onSubscribe.call(subscriber);
                    }
                });
            }
        });
    }

測試一下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("[email protected] "+Thread.currentThread().getName()); //new Thread
                subscriber.onNext(1);
            }})
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<Integer>() {
                  ...
                    @Override
                    public void onNext(Integer var1) {
                        System.out.println("[email protected] "+Thread.currentThread().getName()); // new Thread
                        System.out.println(var1);
                    }
                });

實現observeOn

subscribeOn是作用於下層Subscriber的,需要讓下層Subscriber的事件處理方法放到新執行緒中執行。

為此,在Observable類裡面,新增如下程式碼:

public Observable<T> observeOn(Scheduler scheduler) {
        return Observable.create(new OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                subscriber.onStart();
                Scheduler.Worker worker = scheduler.createWorker();
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onCompleted();
                            }
                        });
                    }
                    @Override
                    public void onError(Throwable t) {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onError(t);
                            }
                        });
                    }
                    @Override
                    public void onNext(T var1) {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onNext(var1);
                            }
                        });
                    }
                });
            }
        });
    }

測試一下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("[email protected] " + Thread.currentThread().getName()); // main
                subscriber.onNext(1);
            }
        })
                .observeOn(Schedulers.io())
                .subscribe(new Subscriber<Integer>() {
                  ...
                    @Override
                    public void onNext(Integer var1) {
                        System.out.println("[email protected] " + Thread.currentThread().getName()); // new Thread
                        System.out.println(var1);
                    }
                });

在Android中切換執行緒

經過以上實踐,我們終於知道了RxJava執行緒切換的核心原理了。下面我們順便來看看Android裡面是如何進行執行緒切換的。

首先找到AndroidSchedulers,發現一個Scheduler的具體實現類:LooperScheduler。

private AndroidSchedulers() {
    ...
            mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
            ...
    }
    /** A {@link Scheduler} which executes actions on the Android UI thread. */
    public static Scheduler mainThread() {
        return getInstance().mainThreadScheduler;
    }

LooperScheduler的程式碼很清晰,內部持有一個Handler,用於執行緒的切換。在Worker的schedule(Action0 action,...)方法中,將action通過Handler切換到所繫結的執行緒中執行。

class LooperScheduler extends Scheduler {
    private final Handler handler;

    LooperScheduler(Looper looper) {
        handler = new Handler(looper);
    }

    LooperScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    static class HandlerWorker extends Worker {
        private final Handler handler;

      ...

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
        ...
            action = hook.onSchedule(action);

            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.

            handler.sendMessageDelayed(message, unit.toMillis(delayTime));
        ...
            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
    }

    static final class ScheduledAction implements Runnable, Subscription {
        private final Action0 action;
        private final Handler handler;
        private volatile boolean unsubscribed;
      ...
        @Override public void run() {
            try {
                action.call();
            } ...
        }
        ...
    }
}

結語

就這樣,以上用程式碼演示了RxJava一些核心功能是如何實現的,希望能給大家帶來不一樣的啟發。但這只是一個小小的Demo,離真正能運用於工程的Rx框架還差太遠。這也讓我們明白到,一個健壯的框架,需要考慮太多東西,比如程式碼的可拓展性和可讀性,效能優化,可測試性,相容性,極端情況等等。但有時要想深入理解一個複雜框架的實現原理,就需要剝離這些細節程式碼,多關注主幹的呼叫邏輯,化繁為簡。

參考&拓展

相關推薦

起來一個RxJava揭祕RxJava實現原理

RxJava是一個神奇的框架,用法很簡單,但內部實現有點複雜,程式碼邏輯有點繞。我讀原始碼時,確實有點似懂非懂的感覺。網上關於RxJava原始碼分析的文章,原始碼貼了一大堆,程式碼邏輯繞來繞去的,讓人看得雲裡霧裡的。既然用拆輪子的方式來分析原始碼比較難啃,不如換種方式,以造輪

linux需要裝?那就和我起來配置一個動態的MOTD登陸效果吧

網址 描述 signature cti info 制作 退出 分享 rep 說到motd,熟悉linux系統的同學應該熟悉,motd它是linux系統上的一個普通的文件,在linux系統上的/etc/motd文件 .作用於:每次系統需要登陸時,通過終端展示一些消息給登陸用戶

起來一個簡易的手風琴效果

今天我們來寫一個基於JQ的手風琴效果。如圖: html部分很簡單。 <div id="main"> <ul> <li class="active"> <h3>白玫瑰

java程式: 輸入數量不確定的正數和負數(次輸入一個 然後列印正數和負數的個數當輸入0時程式停止

import java.util.Scanner; public class CountNumber { /* * 輸入數量不確定的正數和負數(一次輸入一個), 然後列印正數和負

「2.0」個人開發一個App小程式從0到1檔案剖析

不知你是不是見到“檔案剖析”這4個大字,才點進來看一看的?如果真是的話,那我可以坦誠、真心、負責任地告訴你:你上當了,你上了賊船啦,如果你現在想跳的話,還來得及,反正茫茫大海中,鯊魚正缺搞程式的人。說你上當了,是因為我根本就剖析不了。我頂多只能做到分析。 2.1 配置檔案分析 趕快開啟微信開發工具,快速進到專

[3.0] 個人開發一個App小程式從0到1刪減新增

在這個黃道吉日,咱們將要幹一件,驚天地泣鬼神,婦孺皆知的大事,那就是刪掉微信開發工具自動生成的原始碼。 刪掉pages下的index、logs目錄,啥都不留; 刪掉utils下的util.js,只流空氣; 刪掉app.js裡的程式碼,只留App({}); 刪掉app.wxss裡的內容,只留下一地雞毛; 刪掉s

個人開發一個產品小程式從0到1第3章 應用檔案

一個小程式專案,在根目錄下會有3個應用檔案,一個是全域性業務邏輯檔案app.js,一個是公共配置檔案app.json,還有一個是公共樣式表文件app.wxss。在這3個檔案中,app.js和app.json是不可刪除,是必須有的檔案。 3.1 app.js 開啟檔案的那一瞬間,我才知道:人生遇到的人很多,但真

個人開發一個產品小程式從0到1第4章 頁面檔案

一個小程式應用,由index,logs等多個頁面組成。一個頁面,包含4個同名,不同字尾的檔案,它們分別是負責配置的json,佈局的wxml,樣式的wxss和業務邏輯的js。其中,wxml和js是不可刪除,是必須有的檔案。 4.1 index.json 每一個小程式頁面也可以使用.json檔案來對本頁面的視窗表

一個工具幫你實現酷炫的資料視覺化

大部分人眼裡,資料視覺化可以是這樣的:   但其實,它也可以做成這樣:       看著酷炫的視覺化,有人會疑惑,是否有實實在在的用處。其實在商業、生產和運營場景中有大量類似的使用,如集團展覽中心——

手寫一個Redux深入理解其原理

Redux可是一個大名鼎鼎的庫,很多地方都在用,我也用了幾年了,今天這篇文章就是自己來實現一個Redux,以便於深入理解他的原理。我們還是老套路,從基本的用法入手,然後自己實現一個Redux來替代原始碼的NPM包,但是功能保持不變。本文只會實現Redux的核心庫,跟其他庫的配合使用,比如React-Redux

TiDB EcoSystem Tools 原理解讀():TiDB-Binlog 架構演進與實現原理

簡介 TiDB-Binlog 元件用於收集 TiDB 的 binlog,並提供實時備份和同步功能。該元件在功能上類似於 MySQL 的主從複製,MySQL 的主從複製依賴於記錄的 binlog 檔案,TiDB-Binlog 元件也是如此,主要的不同點是 TiDB 是分散式的,因此需要收集各個 TiDB 例項產

篇文章通透理解序列號實現原理

                     1.序列號的本質序列號等價於註冊碼,是軟體發行商的一種維權手段,也就是正版軟體的一個身份證。本質:防止盜版、按功能收費等。 目前,商用軟體和共享軟體絕大部份都是採用註冊碼授權的方式來保證軟體本身不被盜用,以保證自身的利益。儘管很多常用的許多軟體系統的某些版本已經被別人

揭祕 HashMap 實現原理(Java 8)

HashMap 作為一種容器型別,無論你是否瞭解過其內部的實現原理,它的大名已經頻頻出現在各種網際網路面試中了。從基本的使用角度來說,它很簡單,但從其內部的實現來看(尤其是 Java 8 的改進以來),它又並非想象中那麼容易。如果你一定要問了解其內部實現與否對於寫程式究竟有多大影響,我不能給出一個確切的答案。

篇文章了解Consul服務發現實現原理

轉換 圖片 官網 上傳 無序 sta 應用 節點數 編碼 從 2016 年起就開始接觸 Consul,使用的主要目的就是做服務發現,後來逐步應用於生產環境,並總結了少許使用經驗。 最開始使用 Consul 的人不多,這兩年微服務越來越火,使用 Consul

()Spring IoC原始碼-3.其他特性的實現原理-01lazy-init與預例項化

前面的文章也提到過,IOC容器初始化過程一般不包含Bean載入的實現。Bean載入一般發生在應用第一次通過getBean向容器索取Bean的時候。但有一個例外:如果在XML檔案中為Bean定義了lazy-init屬性,那麼Bean的載入在IOC容器初始化時就

()Spring IoC原始碼-3.其他特性的實現原理-02迴圈依賴的解決

引言:迴圈依賴就是N個類中迴圈巢狀引用,如果在日常開發中我們用new 物件的方式發生這種迴圈依賴的話程式會在執行時一直迴圈呼叫,直至記憶體溢位報錯。下面說一下Spring是如果解決迴圈依賴的。 第一種:構造器引數迴圈依賴 Spring容器會將每一個正

【Big Data 每日題20181031】深入分析volatile的實現原理

通過前面一章我們瞭解了synchronized是一個重量級的鎖,雖然JVM對它做了很多優化,而下面介紹的volatile則是輕量級的synchronized。如果一個變數使用volatile,則它比使用synchronized的成本更加低,因為它不會引起執行緒上下文的切換

網路程式設計()——淺析web伺服器與瀏覽器的實現原理

我們基本每天都在通過WEB瀏覽器,去瀏覽一些新聞,看看視訊之類的。眾所周知,這就是所謂的B/S結構(Browser/Server,瀏覽器/伺服器模式),是WEB興起後的一種網路結構模式,WEB瀏覽器是客戶端最主要的應用軟體。那順道就來簡單的看一下,所謂的Web伺服器(例如知名

epoll為什麼這麼快epoll的實現原理

以一個生活中的例子來解釋. 假設你在大學中讀書,要等待一個朋友來訪,而這個朋友只知道你在A號樓,但是不知道你具體住在哪裡,於是你們約好了在A號樓門口見面. 如果你使用的阻塞IO模型來處理這個問題,那麼你就只能一直守候在A號樓門口等待朋友的到來,在這段時間裡你不能做別的事情,