1. 程式人生 > >初步瞭解響應式框架——agera

初步瞭解響應式框架——agera

Google在上週開源了一個響應式框架——agera,相信它會慢慢地被廣大程式設計師所熟知。我個人對這樣的技術是很感興趣的,在這之前也研究過RxJava,所以在得知Google開源了這樣的框架之後第一時間進行了學習,這裡算是把學習的心得和大家分享。當然由於本人水平有限,這篇文章可能起的更多的作用是拋磚引玉,希望有更多的大神能加入到學習agera的大部隊中,爭取早日出現幾篇讓人信服的文章!

通過這篇文章你可能會學習到:

  1. agera是什麼,也就是它的基本概念和大體框架
  2. agera的基礎用法
  3. agera的進階用法
  4. agera的原始碼分析
  5. 如何封裝agera
  6. agera和RxJava的區別

好了,讓我們正式開啟agera的學習之旅吧。

Agera

agera是什麼

回答agera是什麼之前,我們要先了解什麼是響應式程式設計和函數語言程式設計,這裡我不展開講了,大家可以自行去Google或者wiki。在agera出現之前,Java上已經有了一個很著名的同類型框架,叫做RxJava,其衍生出的更適合Android的版本RxAndroid和各種層出不窮的“兒子”類似RxBus,RxBinding等等都讓人眼前一亮,那Google為什麼還要去寫一個agera呢?這個我也不好回答,畢竟我不是寫這個框架的人啊,不過有一點可以確定的是,作為Google的“親兒子”,它在Android中擁有的潛力和發揮的威力必定是很大的,個人覺得在馬上就要舉行的I/O大會上,這個框架會被拿出來講解。

好了,下面讓我們具體說下agera吧,下面一段話摘自agera的GitHub主頁。

agera is a set of classes and interfaces to help wirte functional,asynchronous and reactive applications for Android.Requires Android SDK version 9 or higher.

簡單的翻譯下,就是說agera是一個能幫助Android開發者更好的開發函式式,非同步和響應式程式的框架,要求Android的SDK版本在9以上。

在瞭解agera是什麼之後,我們還需要明白一點的就是,它和RxJava一樣,是基於觀察者模式開發的,所以其中會有一些概念,我在後文中會一一進行闡述。

agera的基礎用法

講完了agera是什麼以後,大家有沒有躍躍欲試了呢?下面就讓我帶大家來了解一下agera最基礎的用法吧。

首先,我們要明確,既然agera是基於觀察者模式的,那它其中的觀察者,被觀察者等是用什麼來表現的呢?

在agera中,有兩個基本的概念:Observable和Updatable。

Observable & Updatable

public interface Observable {

  /**
   * Adds {@code updatable} to the {@code Observable}.
   *
   * @throws IllegalStateException if the {@link Updatable} was already added or if it was called
   * from a non-Looper thread
   */
  void addUpdatable(@NonNull Updatable updatable);

  /**
   * Removes {@code updatable} from the {@code Observable}.
   *
   * @throws IllegalStateException if the {@link Updatable} was not added
   */
  void removeUpdatable(@NonNull Updatable updatable);
}
/**
 * Called when when an event has occurred. Can be added to {@link Observable}s to be notified
 * of {@link Observable} events.
 */
public interface Updatable {

  /**
   * Called when an event has occurred.
   */
  void update();
}

Updatable指代的是觀察者模式中的觀察者,而Observable所指代的就是觀察者模式中的被觀察者。整個agera就是建立在[使用Updatable去觀察Observable,Observable去通知Updatable更新]的基礎上進行開發的。具體到程式碼就是使用Observable的addUpdatable()方法去將Updatable註冊到Observable中,並且在合適的實際呼叫Updatable的update()方法去通知Updatable更新。下面讓我們看一個具體的例子。

首先介面很簡單,就一個Button和一個TextView,我們的目標是點選Button之後,改變TextView的文字顯示。

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:orientation="vertical"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:fitsSystemWindows="true"
    tools:context="zjutkz.com.guide.MainActivity">

    <Button
        android:text="trigger"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="trigger"/>

    <TextView
        android:id="@+id/show"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:text="wait for trigger..."
        android:textSize="20sp"
        android:gravity="center"/>

</LinearLayout>
public class MainActivity extends AppCompatActivity implements Updatable{

    private TextView show;

    private Observable observable = new Observable() {
        @Override
        public void addUpdatable(@NonNull Updatable updatable) {
            updatable.update();
        }

        @Override
        public void removeUpdatable(@NonNull Updatable updatable) {

        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        show = (TextView)findViewById(R.id.show);
    }

    public void trigger(View view){
        observable.addUpdatable(this);
    }

    @Override
    public void update() {
        show.setText("update!!");
    }
}

看我們的activity的程式碼,首先我們要做的就是讓我們的activity實現Updatable這個介面,然後在update()方法中將TextView的文字進行改變。接著,創造出一個Observable,當我們點選Button的時候,使用Observable的addUpdatable()方法,而我們前面定義的那個Observable在其addUpdatable()方法中就呼叫了對應Updatable例項的update(),這樣,我們就完成了一個最簡單的事件訂閱。

但是上面的程式碼有一個很大的問題,不知道大家看出來沒有,那就是Observable和Updatable之間的通訊,完全沒有資料的存在,也就是說當你的Observable想要傳遞一些資料給Updatable的時候,通過這樣的方式是沒辦法實現的,而且不管你怎麼搞都不行,因為對應的方法引數中就沒有和資料相關的邏輯。

看到這你可能會說,”這不坑爹嗎!連資料都傳遞不了,還談什麼觀察者模式,談什麼響應式程式設計!“不要著急,這是Google故意而為之的,他們的想法就是要讓資料從Observable和Updatable中剝離,從而達到他們所期望的“Push event,pull data model”。這個我在後面和RxJava的比較中會講,RxJava是”Push data model”。

Repository

前文中最後一段雖然講明白了Google為什麼要這樣做,但是還是沒有說解決資料傳遞的方案,這個時候如果你興沖沖地去GitHub上給他們提issue,他們會這樣和你說:“你啊,不要老是想著搞個大新聞,你問我滋不滋辭資料傳遞,我當然說是滋辭的啦。“

那到底怎麼滋辭,啊不是,支援資料傳遞呢?Google已經給我們提供了一個介面,叫做Repository。

public interface Repository<T> extends Observable, Supplier<T> {}

可以看到,它繼承自Observable,說明是一個被觀察者,那這個Supplier又是什麼呢?

public interface Supplier<T> {

  /**
   * Returns an instance of the appropriate type. The returned object may or may not be a new
   * instance, depending on the implementation.
   */
  @NonNull
  T get();
}

看這個程式碼,配上介面的名字大家就可以猜出來,這是一個提供資料的東西。

綜上所述,Repository的作用就是——既是一個被觀察者,同時也提供資料給觀察者。

還是讓我們用程式碼來說話吧。

介面還是一樣,這裡不貼了,一個Button一個TextView。

private Supplier<String> supplier = new Supplier() {
        @NonNull
        @Override
        public Object get() {
            return "update!!";
        }
    };

private Repository<String> repository =     Repositories.repositoryWithInitialValue("a")
            .observe()
            .onUpdatesPerLoop()
            .thenGetFrom(supplier)
            .compile();

public void trigger(View view){
        repository.addUpdatable(this);
    }

@Override
public void update() {
    show.setText(repository.get());
}

上面的那兩個初始化程式碼大家可以先不用懂,具體看下面的,點選Button(進入trigger(View view)方法)之後,我們和剛才一樣,使用了addUpdatable將我們繼承自Updatable的activity註冊到repository中,然後repository發現有東西註冊到了自己這兒,經過一系列的方法執行,就會呼叫Updatable的update()方法,然後我們通過repository.get()去拿到對應的資料就OK了。

這裡給大家捋一捋agera中幾個基礎但是很重要的概念:

(1) Observable:agera中的被觀察者,用於在合適的時機去通知觀察者進行更新。

(2) Updatable:agera中的觀察者,用於觀察Observable。

(3) Supplier:agera中提供資料的介面,通過範型指定資料型別,通過get()方法獲取資料。

(4) Repository:agera中集成了Observable和Supplier功能的一個[提供資料的被觀察者]。

說到這裡,大家可能會有一個問題,前面說了agera是”Push event,pull data model”,也就是資料和事件分離的,那這個Repository的出現不是自己打自己的臉嗎?

其實不是的,大家可以看GitHub上wiki裡的這一句:

This does not change the push event, pull data model: the repository notifies the registered updatables to update themselves when the data changes; and the updatables pull data from the repository when they individually react to this event.

通過程式碼來解釋就是,Repository經過一系列的方法執行之後,呼叫了Updatable的update()方法,這個是事件傳遞,也就是push event,而Updatable在接收到喚醒事件之後,通過呼叫Repository的get()方法,自己去獲取資料而不是從updata()方法中拿到傳遞過來的資料,類似update(T value),這是pull data。這樣的好處是可以lazy load,這個我們在後文中會講。

agera的進階用法

講完了agera基礎的概念,讓我們來看看它的正確使用姿勢。

前面我們有講到Repository,大家通過程式碼肯定看的一頭霧水,這裡讓我們來聊聊它吧。

Repository

首先看一個例子。

private Supplier<String> strSupplier = new Supplier<String>() {
    @NonNull
    @Override
    public String get() {
        return "value";
    }
};

private Function<String,String> transform = new Function<String, String>() {
    @NonNull
    @Override
    public String apply(@NonNull String input) {
        return "new " + input;
    }
};

private Supplier<Integer> integerSupplier = new Supplier<Integer>() {
    @NonNull
    @Override
    public Integer get() {
        return 100;
    }
};

private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {
    @NonNull
    @Override
    public String merge(@NonNull String s, @NonNull Integer integer) {
        return s + "plus " + String.valueOf(integer);
    }
};

private Updatable updatable = new Updatable() {
        @Override
        public void update() {
            Log.d("TAG", repository.get());
        }
    };

repository = Repositories.repositoryWithInitialValue("default")
                .observe()
                .onUpdatesPerLoop()
                .getFrom(strSupplier)
                .transform(transform)
                .thenMergeIn(integerSupplier,merger)
                .compile();

repository.addUpdatable(updatable);

這段程式碼大家能看懂的部分我相信只有repository.addUpdatable(updatable);這一句。。

從大體上說,就是將一個updatable通過repository.addUpdatable(updatable);這個方法註冊到對應的repository中,然後repository經過一系列的方法呼叫去通知updatable更新,大家可以在logcat中看到輸出的結果是

result

那最主要的這段程式碼是什麼意思呢?

private Supplier<String> strSupplier = new Supplier<String>() {
    @NonNull
    @Override
    public String get() {
        return "value";
    }
};

private Function<String,String> transform = new Function<String, String>() {
    @NonNull
    @Override
    public String apply(@NonNull String input) {
        return "new " + input;
    }
};

private Supplier<Integer> integerSupplier = new Supplier<Integer>() {
    @NonNull
    @Override
    public Integer get() {
        return 100;
    }
};

private Merger<String,Integer,String> merger = new Merger<String, Integer, String>() {
    @NonNull
    @Override
    public String merge(@NonNull String s, @NonNull Integer integer) {
        return s + " plus " + String.valueOf(integer);
    }
};

private Updatable updatable = new Updatable() {
    @Override
    public void update() {
        Log.d("TAG", repository.get());
    }
};

repository = Repositories.repositoryWithInitialValue("default")
                .observe()
                .onUpdatesPerLoop()
                .getFrom(strSupplier)
                .transform(transform)
                .thenMergeIn(integerSupplier,merger)
                .compile();

這裡就不得不提一下RxJava了,大家知道在RxJava中存在很多幫助大家進行資料轉換的操作符,像map,flatMap,take等等,而這裡的getFrom,transform和thenMergeIn也是一樣,是Google封裝好了幫助大家進行資料操作的。而且從名字就可以看出來:

repositoryWithInitialValue意思是建立一個Repository並且賦一個初始值。

getFrom的意思是從一個Supplier那裡獲取資料。

transfrom就是進行轉換,這裡通過一個Function將repository從strSupplier那裡得到的資料前面加上一個”new”字串,這個操作符很像RxJava中的map。

而最後那個thenMergeIn則是將intergerSupplier中提供的資料和我們現在repository中的資料進行一個整合。

最後通過complie得到Repository例項。

是不是和RxJava很相似呢?就是一種可以看作流式的操作。

看到這裡大家可能又要問了,那前面的observe()和onUpdatesPerLoop()是什麼呢?為什麼最後那個叫thenMergeIn()不叫mergeIn()呢?

這裡要給大家講一個概念,agera通過這樣去建立一個Repository,是有一個state,也就是狀態的概念的。

public interface RepositoryCompilerStates {


  interface REventSource<TVal, TStart> {

    @NonNull
    RFrequency<TVal, TStart> observe(@NonNull Observable... observables);
  }

  interface RFrequency<TVal, TStart> extends REventSource<TVal, TStart> {

    @NonNull
    RFlow<TVal, TStart, ?> onUpdatesPer(int millis);

    @NonNull
    RFlow<TVal, TStart, ?> onUpdatesPerLoop();
  }

  interface RFlow<TVal, TPre, TSelf extends RFlow<TVal, TPre, TSelf>>
      extends RSyncFlow<TVal, TPre, TSelf> {

    @NonNull
    @Override
    <TCur> RFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);

    @NonNull
    @Override
    <TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptGetFrom(
        @NonNull Supplier<Result<TCur>> attemptSupplier);

    @NonNull
    @Override
    <TAdd, TCur> RFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
        @NonNull Merger<? super TPre, ? super TAdd, TCur> merger);

    @NonNull
    @Override
    <TAdd, TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptMergeIn(
        @NonNull Supplier<TAdd> supplier,
        @NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);

    @NonNull
    @Override
    <TCur> RFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);

    @NonNull
    @Override
    <TCur> RTermination<TVal, Throwable, RFlow<TVal, TCur, ?>> attemptTransform(
        @NonNull Function<? super TPre, Result<TCur>> attemptFunction);

    @NonNull
    TSelf goTo(@NonNull Executor executor);

    @NonNull
    RSyncFlow<TVal, TPre, ?> goLazy();
  }

  interface RSyncFlow<TVal, TPre, TSelf extends RSyncFlow<TVal, TPre, TSelf>> {

    @NonNull
    <TCur> RSyncFlow<TVal, TCur, ?> getFrom(@NonNull Supplier<TCur> supplier);

    @NonNull
    <TCur>
    RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptGetFrom(
        @NonNull Supplier<Result<TCur>> attemptSupplier);

    @NonNull
    <TAdd, TCur> RSyncFlow<TVal, TCur, ?> mergeIn(@NonNull Supplier<TAdd> supplier,
        @NonNull Merger<? super TPre, ? super TAdd, TCur> merger);

    @NonNull
    <TAdd, TCur>
    RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptMergeIn(
        @NonNull Supplier<TAdd> supplier,
        @NonNull Merger<? super TPre, ? super TAdd, Result<TCur>> attemptMerger);

    @NonNull
    <TCur> RSyncFlow<TVal, TCur, ?> transform(@NonNull Function<? super TPre, TCur> function);

    @NonNull
    <TCur> RTermination<TVal, Throwable, ? extends RSyncFlow<TVal, TCur, ?>> attemptTransform(
        @NonNull Function<? super TPre, Result<TCur>> attemptFunction);

    @NonNull
    RTermination<TVal, TPre, TSelf> check(@NonNull Predicate<? super TPre> predicate);

    @NonNull
    <TCase> RTermination<TVal, TCase, TSelf> check(
        @NonNull Function<? super TPre, TCase> caseFunction,
        @NonNull Predicate<? super TCase> casePredicate);

    @NonNull
    TSelf sendTo(@NonNull Receiver<? super TPre> receiver);

    @NonNull
    <TAdd> TSelf bindWith(@NonNull Supplier<TAdd> secondValueSupplier,
        @NonNull Binder<? super TPre, ? super TAdd> binder);

    @NonNull
    RConfig<TVal> thenSkip();

    @NonNull
    RConfig<TVal> thenGetFrom(@NonNull Supplier<? extends TVal> supplier);

    @NonNull
    RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptGetFrom(
            @NonNull Supplier<? extends Result<? extends TVal>> attemptSupplier);

    @NonNull
    <TAdd> RConfig<TVal> thenMergeIn(@NonNull Supplier<TAdd> supplier,
        @NonNull Merger<? super TPre, ? super TAdd, ? extends TVal> merger);

    @NonNull
    <TAdd> RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptMergeIn(
            @NonNull Supplier<TAdd> supplier,
            @NonNull Merger<? super TPre, ? super TAdd,
                ? extends Result<? extends TVal>> attemptMerger);

    @NonNull
    RConfig<TVal> thenTransform(
        @NonNull Function<? super TPre, ? extends TVal> function);

    @NonNull
    RTermination<TVal, Throwable, RConfig<TVal>> thenAttemptTransform(
            @NonNull Function<? super TPre, ? extends Result<? extends TVal>> attemptFunction);
  }

  interface RTermination<TVal, TTerm, TRet> {

    @NonNull
    TRet orSkip();

    @NonNull
    TRet orEnd(@NonNull Function<? super TTerm, ? extends TVal> valueFunction);
  }

  interface RConfig<TVal> {

    @NonNull
    RConfig<TVal> notifyIf(@NonNull Merger<? super TVal, ? super TVal, Boolean> checker);

    @NonNull
    RConfig<TVal> onDeactivation(@RepositoryConfig int deactivationConfig);

    @NonNull
    RConfig<TVal> onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);

    @NonNull
    Repository<TVal> compile();


    @NonNull
    <TVal2> RFrequency<TVal2, TVal> compileIntoRepositoryWithInitialValue(@NonNull TVal2 value);
  }

我們可以看到這個介面,裡面的方法很多,不過只要仔細看就會發現它裡面定義的方法正是我們剛才repository中為了操作資料而使用的,不同的是,它們的返回並不是Repository,而是一些其他的東西。而這個返回值,就表示了Repository正在處理的資料的狀態。

這裡給大家總結一下幾種代表性的狀態,其他沒提到的都是繼承自其中的一個,表示的狀態是差不多的。

REventSource:這個是最初的狀態,Repositories.repositoryWithInitialValue()這個方法的返回值就是REventSource,表明事件源的開始。

RFrequency:表示事件源傳送的頻率。

RFlow:表示資料處理流,這裡定義的方法都是和資料處理相關的,比如getFrom(),mergeIn()等等。可以看到,getFrom()這樣的方法返回值都是RFlow,說明我們可以流式的呼叫,比如在getFrom()後面呼叫mergeIn(),但是其餘的thenXXX()返回的都是RTermination,說明如果你呼叫了這樣的方法,那麼資料處理流也就結束了。

RTermination:表示最後終止資料處理流。

RConfig:其餘各種配置,比如notifyIf()這樣的是否要喚醒Updatable等等。

通過這樣定義狀態,我們可以很清晰的知道現在處理什麼狀態,也能更好的理解整個函式的呼叫過程。

初始化(Repositories.repositoryWithInitialValue(…))->

表示事件開始(observe())->

規定事件傳送的頻率(onUpdatesPerLoop()或者onUpdatesPer(…))->

處理資料流(各種處理函式)->

結束資料流->

配置一些屬性(notifyIf(…)等等)->

complie()。

整個過程是不可逆的,也就是說你不能在呼叫了thenMergeIn()之後去呼叫類似getFrom()這樣的函式,你呼叫了thenXXX()就表示你要結束這個資料處理流了。

說到這裡我們就說完了整個Repository資料處理流的過程,但是我們會發現,上面看到的程式碼都只是一個抽象的介面,那麼具體的實現在哪裡呢?(這裡為了讓大家更好的理解agera,要看一點原始碼了,雖然標題是進階使用。。)

讓我們回頭最開始,看一下Repositories.repositoryWithInitialValue()這個函式。

@NonNull
public static <T> REventSource<T, T> repositoryWithInitialValue(@NonNull final T initialValue) {
  return RepositoryCompiler.repositoryWithInitialValue(initialValue);
}

呼叫了RepositoryCompiler的同名函式。讓我們看看RepositoryCompiler是個啥東西。

final class RepositoryCompiler implements
    RepositoryCompilerStates.RFrequency,
    RepositoryCompilerStates.RFlow,
    RepositoryCompilerStates.RTermination,
    RepositoryCompilerStates.RConfig {

    .......
}

我們驚奇的發現,它實現了上面提到的那些介面,也就是說RepositoryCompiler就是agera用來管理Repository資料處理流狀態的類。讓我們看看最後compiler()方法到底生成了怎樣一個Repository。

@NonNull
@Override
public Repository compile() {
  Repository repository = compileRepositoryAndReset();
  recycle(this);
  return repository;
}

@NonNull
  private Repository compileRepositoryAndReset() {
    checkExpect(CONFIG);
    Repository repository = CompiledRepository.compiledRepository(initialValue, eventSources, frequency, directives,
        notifyChecker, concurrentUpdateConfig, deactivationConfig);
    expect = NOTHING;
    initialValue = null;
    eventSources.clear();
    frequency = 0;
    directives.clear();
    goLazyUsed = false;
    notifyChecker = objectsUnequal();
    deactivationConfig = RepositoryConfig.CONTINUE_FLOW;
    concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;
    return repository;
  }

可以看到呼叫了CompiledRepository的compiledRepository方法。

@NonNull
static Repository compiledRepository(
    @NonNull final Object initialValue,
    @NonNull final List<Observable> eventSources,
    final int frequency,
    @NonNull final List<Object> directives,
    @NonNull final Merger<Object, Object, Boolean> notifyChecker,
    @RepositoryConfig final int concurrentUpdateConfig,
    @RepositoryConfig final int deactivationConfig) {
  Observable eventSource = perMillisecondObservable(frequency,
      compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));
  Object[] directiveArray = directives.toArray();
  return new CompiledRepository(initialValue, eventSource,
      directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig);
}

分析到這裡我們就清楚了,我們使用的Repository,原來都是compiledRepository!

其實這些類的名字已經幫我們很好的理解了整個流程。

首先第一步是呼叫Repositories.repositoryWithInitialValue()。[Repositories]這個名字就是一個utils類,說明是幫助我們生成Respository的。

後面的各種狀態處理都在RepositoryCompiler類中,意思是Repository的編譯者,專門為了生成Repository而創造的。

最後生成的是CompiledRepository,表示編譯過後的Repository,擁有完善的功能。

好了,到這裡關於Repository的東西就講完了,大家可以嘗試著自己去寫一下,這些個資料處理的方法能讓我們像RxJava一樣輕鬆的處理資料。當然,agera也提供了非同步操作的封裝,like this:

private Executor executor = Executors.newSingleThreadExecutor();

repository = Repositories.repositoryWithInitialValue("default")
        .observe()
        .onUpdatesPerLoop()
        .goTo(executor)
        .thenGetFrom(new Supplier<Object>() {
            @NonNull
            @Override
            public Object get() {
                //some biz work,may be block the main thread.
                return null;
            }
        })
        .compile();

使用goTo操作符就可以了。

Attempt & Result

在上面的例子中,我們使用了Repository去代替原始的Observable,配合上操作符已經能初步完成我們的各種需求了。但是這裡有一個問題,萬一在Supplier的get()方法中發生了錯誤呢?比如這樣

private Supplier<Integer> strSupplier = new Supplier<Integer>() {
    @NonNull
    @Override
    public Integer get() {
        return 1/0;
    }
};

當然這種程式碼在實際情況下是不會產生的,但是總會有錯誤發生啊,對於RxJava,它有很好的error handling機制,那agera有嗎?答案是有的。就是通過操作符attemptXXX()和Result類來解決。

首先看一段程式碼

repository = Repositories.repositoryWithInitialValue(0)
        .observe()
        .onUpdatesPerLoop()
        .thenGetFrom(strSupplier)
        .compile();

repository.addUpdatable(this);

如果使用我們剛才的方式去做,strSupplier的get()方法中return 1/0,這樣就爆炸了。。程式直接退出,你一天美好的心情就此終結。但是如果這樣

private Supplier<Result<Integer>> safeStrSupplier = new Supplier<Result<Integer>>() {
        @NonNull
        @Override
        public Result<Integer> get() {
            try{
                return Result.success(1/ 0);
            }catch (ArithmeticException e){
                return Result.failure(e);
            }
        }
    };

safeRepository = Repositories.repositoryWithInitialValue(Result.<Integer>absent())
        .observe()
        .onUpdatesPerLoop()
        .attemptGetFrom(safeStrSupplier).orEnd(new Function<Throwable, Result<Integer>>() {
            @NonNull
            @Override
            public Result<Integer> apply(@NonNull Throwable input) {
                return Result.success(2222);
            }
        })
        .thenTransform(new Function<Integer, Result<Integer>>() {
            @NonNull
            @Override
            public Result<Integer> apply(@NonNull Integer input) {
                return Result.absentIfNull(input);
            }
        })
        .compile();

safeRepository.addUpdatable(this);

可以看到,我們嘗試用attempGetFrom()去代替getFrom(),後面跟上了orEnd(),這裡你也可以使用orSkip()兩個函式的差別是如果接受到了異常,前者還是會通知Updatable去更新,而後者直接跳過。Supplier也有差別,我們在safeSupplier中使用Result類去包裹住了我們操作的資料,並且通過呼叫success()或者failure()去執行成功或者失敗。

所以這裡,如果你寫了1/0這樣的程式碼並且引發了異常,我們可以安全的捕獲它並且做你想要做的操作。另外大家可以看thenTransform()中,我們return Result.absentIfNull(input);表示如果資料是空的,我們就返回預設值。

我們在日常編碼中,儘量要採用這樣的方式去防止異常的發生。

Receiver

上面說了Result,這裡我們可以使用Receiver去配合Result進行使用。

private Receiver<Throwable> errorReceiver = new Receiver<Throwable>() {
        @Override
        public void accept(@NonNull Throwable value) {
            trigger.setText(value.toString());
        }
    };

private Receiver<Integer> successReceiver = new Receiver<Integer>() {
        @Override
        public void accept(@NonNull Integer value) {
            trigger.setText(String.valueOf(value));
        }
    };

@Override
public void update() {
    safeRepository.get()
            .ifFailedSendTo(errorReceiver)
            .ifSucceededSendTo(successReceiver);
}

看上面這段程式碼,和上一節的程式碼一樣,我們safeRepository指定的範型是Result,所以在update()方法中get到的就是一個Result,它的ifFailedSendTo()和ifFailedSendTo()表示如果整個資料流成功傳送給xx或者失敗傳送給xx,這裡的xx必須要實現Receiver介面。

/**
 * A receiver of objects.
 */
public interface Receiver<T> {

  /**
   * Accepts the given {@code value}.
   */
  void accept(@NonNull T value);
}

然後我們可以在accept()方法中拿到對應的值進行操作。

Reservoir

這個東西呢,簡單來說就是響應式程式設計中的queue,用來進行生產者/消費者操作的。

public interface Reservoir<T> extends Receiver<T>, Repository<Result<T>> {}

可以看到它繼承自Receiver和Repository,所以它可以使用accept()去接受資料,也可以使用get()去返回資料。

我們在使用中通過呼叫下面的程式碼去獲取一個Reservior。

private Reservoir<String> provider = Reservoirs.reservoir();

跟蹤Reservoirs的原始碼看一下。

@NonNull
public static <T> Reservoir<T> reservoir(@NonNull final Queue<T> queue) {
  return new SynchronizedReservoir<>(checkNotNull(queue));
}

private static final class SynchronizedReservoir<T> extends BaseObservable
      implements Reservoir<T> {
    @NonNull
    private final Queue<T> queue;

    private SynchronizedReservoir(@NonNull final Queue<T> queue) {
      this.queue = checkNotNull(queue);
    }

    @Override
    public void accept(@NonNull T value) {
      boolean shouldDispatchUpdate;
      synchronized (queue) {
        boolean wasEmpty = queue.isEmpty();
        boolean added = queue.offer(value);
        shouldDispatchUpdate = wasEmpty && added;
      }
      if (shouldDispatchUpdate) {
        dispatchUpdate();
      }
    }

    @NonNull
    @Override
    public Result<T> get() {
      T nullableValue;
      boolean shouldDispatchUpdate;
      synchronized (queue) {
        nullableValue = queue.poll();
        shouldDispatchUpdate = !queue.isEmpty();
      }
      if (shouldDispatchUpdate) {
        dispatchUpdate();
      }
      return absentIfNull(nullableValue);
    }

    @Override
    protected void observableActivated() {
      synchronized (queue) {
        if (queue.isEmpty()) {
          return;
        }
      }
      dispatchUpdate();
    }
  }

可以看到SynchronizedReservoir中有一個queue,accpet的時候去存放資料,get的時候去取出資料。

很慚愧,這裡關於Reservio我還不是非常的明白,只知道如何用,不知道為什麼這樣用,所以這裡就不給大家過多的介紹了,以免讓產生大家錯誤的理解。有興趣的同學可以去看這頁wiki。

Function的使用

通過前面的學習我們知道了agera和RxJava一樣存在很多使用的操作符,但是讓我們想象一下,如果有一個非常複雜的操作,那我們是不是要寫一堆的transform()這樣的操作符呢?我相信這樣做是可以的,但是再考慮一點,對於一個通用的操作,你這樣去使用怎麼達到複用的目的呢?難道5個頁面都有想用的操作,你要每個頁面都去寫一遍嗎?

Google顯示不會讓我們陷入這樣的窘境,所以就有了[Functions]這個類。

看名字就知道,和之前的Repositories一樣,它是一個工具類。它可以將多個Function有機地結合在一起。

private Supplier<String> supplier = new Supplier<String>() {
    @NonNull
    @Override
    public String get() {
        return "url";
    }
};

private Function<String,List<Integer>> strToList = new Function<String, List<Integer>>() {
    @NonNull
    @Override
    public List<Integer> apply(@NonNull String input) {
        List<Integer> data = new ArrayList<>();
        for(int i = 0;i < 10;i++){
            data.add(i);
        }
        return data;
    }
};

private Predicate<Integer> filter = new Predicate<Integer>() {
    @Override
    public boolean apply(@NonNull Integer value) {
        return value > 5;
    }
};

private Function<Integer,String> intToStr = new Function<Integer, String>() {
    @NonNull
    @Override
    public String apply(@NonNull Integer input) {
        return String.valueOf(input);
    }
};

private Function<List<String>, Integer> getSize = new Function<List<String>, Integer>() {
    @NonNull
    @Override
    public Integer apply(@NonNull List<String> input) {
        return input.size();
    }
};

Function<String,Integer> finalFunc = Functions.functionFrom(String.class)
        .unpack(strToList)
        .filter(filter)
        .map(intToStr)
        .thenApply(getSize);

private Repository<String> repository;

repository = Repositories.repositoryWithInitialValue("default")
                .observe()
                .onUpdatesPerLoop()
                .getFrom(supplier)
                .transform(finalFunc)
                .thenTransform(new Function<Integer, String>() {
                    @NonNull
                    @Override
                    public String apply(@NonNull Integer input) {
                        return String.valueOf(input);
                    }
                })
                .compile();

repository.addUpdatable(this);

其中重點關注

Function<String,Integer> finalFunc = Functions.functionFrom(String.class)
        .unpack(strToList)
        .filter(filter)
        .map(intToStr)
        .thenApply(getSize);

Functions類提供的各種操作符類似unpack(),filter()等,將一個個操作符連了起來並且生成一個最終的操作符。我們就可以拿這個操作符放到我們的Repository的資料處理狀態機中,並且你還可以把這樣的finalFunc儲存起來,哪裡要用了直接拿出來用,達到複用的目的。

到這兒關於agera的進階使用也說完了。怎麼說呢,這裡我也是帶