Vert.x Java開發指南——第九章 利用RxJava進行響應式程式設計
第九章 利用RxJava進行響應式程式設計
截止目前,我們已經探索了Vert.x技術棧的多個部分,使用基於回撥的API。它僅僅可以正常工作,而且這個程式設計模型對於開發者在許多語言中是非常熟悉的。儘管如此,它可能有點繁瑣,尤其當你組合幾個事件源或者處理複雜的資料流時。
而這正是RxJava閃耀的地方,Vert.x無縫的集成了它。
9.1 啟用RxJava API
除了基於回撥的API,Vert.x模組提供了一套“Rxified”API。為了啟用它,首先需要新增vertx-rx-java模組到Maven POM檔案:
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-rx-java</artifactId> </dependency>
Verticle現在需要修改為繼承自io.vertx.rxjava.core.AbstractVerticle而不是io.vertx.core.AbstractVerticle。這有什麼不同?前一個類擴充套件了後者,並暴露了一個io.vertx.rxjava.core.Vertx型別的屬性。
io.vertx.rxjava.core.Vertx定義了額外的rxSomething(…)方法,這相當於基於回撥的對等體。
讓我們看一下MainVerticle,以便在實踐中更好地瞭解它是如何工作的:
Single<String> dbVerticleDeployment = vertx.rxDeployVerticle("io.vertx.guides.wiki.database.WikiDatabaseVerticle" );
rxDeploy方法沒有使用一個Handler
9.2 按順序部署Verticle
為了完成MainVerticle重構,我們必須確保部署操作被觸發並按順序發生:
dbVerticleDeployment.flatMap(id -> { ①
Single<String> httpVerticleDeployment = vertx.rxDeployVerticle( "io.vertx.guides.wiki.http.HttpServerVerticle",new DeploymentOptions().setInstances(2));
return httpVerticleDeployment;
}).subscribe(id->startFuture.complete(),startFuture::fail); ②
① flatMap方法應用該函式到dbVerticleDeployment的結果。此處它排程HttpServerVerticle的部署。
② 操作是在訂閱時啟動。根據結果成功還是失敗,MainVerticle呼叫startFuture的complete或者fail方法。
9.3 部分“Rxifying”的HttpServerVerticle
如果你按順序閱讀指南,並按照講解編輯你的程式碼,那麼你的HttpServerVerticle類依舊使用基於回撥的API。在你順理成章的使用RxJava API執行非同步操作之前,如併發,你需要重構HttpServerVerticle。
9.3.1 匯入Vert.x類的RxJava版本
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.http.HttpServer;
import io.vertx.rxjava.ext.auth.AuthProvider;
import io.vertx.rxjava.ext.auth.User;
import io.vertx.rxjava.ext.auth.jwt.JWTAuth;
import io.vertx.rxjava.ext.auth.shiro.ShiroAuth;
import io.vertx.rxjava.ext.web.Router;
import io.vertx.rxjava.ext.web.RoutingContext;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.ext.web.client.HttpResponse; ①
import io.vertx.rxjava.ext.web.codec.BodyCodec;
import io.vertx.rxjava.ext.web.handler.*;
import io.vertx.rxjava.ext.web.sstore.LocalSessionStore;
import io.vertx.rxjava.ext.web.templ.FreeMarkerTemplateEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
① 我們的backupHandler()方法依舊使用HttpResponse類,因此它必須被匯入。事實證明,Vert.x提供的RxJava版本的HttpResponse可以作為這種情況下的替代。在本指南倉庫step-8目錄中的“Rxified”程式碼沒有匯入這個類,因為響應型別是由lambda表示式推斷的。
9.3.2 在一個“Rxified” vertx例項上使用委派
當你有一個io.vertx.rxjava.core.Vertx物件,並希望對io.vertx.core.Vertx例項進行方法呼叫時,可以呼叫getDelegate()方法。Verticle的start()方法需要調整,當建立一個WikiDatabaseService時:
@Override
public void start(Future<Void> startFuture) throws Exception {
String wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue");
dbService = io.vertx.guides.wiki.database.WikiDatabaseService.createProxy(vertx.getDelegate(), wikiDbQueue);
9.4 併發執行授權查詢
在前面的例子中,我們看到了如何使用RxJava和Rxified Vert.x API按順序執行非同步操作。但是有時候這種保證(譯者注:指按順序執行非同步操作)是不需要的,或許你只是出於效能原因需要它們簡單的併發執行。
HttpServerVerticle的JWT令牌生成過程是這種情況的一個好例子。為了建立一個令牌,我們需要所有的授權查詢結果完成,但是查詢是相互獨立的:
auth.rxAuthenticate(creds).flatMap(user -> {
Single<Boolean> create = user.rxIsAuthorised("create"); ①
Single<Boolean> delete = user.rxIsAuthorised("delete");
Single<Boolean> update = user.rxIsAuthorised("update");
return Single.zip(create, delete, update, (canCreate, canDelete, canUpdate) -> { ②
return jwtAuth.generateToken(
new JsonObject()
.put("username", context.request().getHeader("login")) .put("canCreate", canCreate)
.put("canDelete", canDelete)
.put("canUpdate", canUpdate),
new JWTOptions() .setSubject("Wiki API") .setIssuer("Vert.x"));
});
}).subscribe(token -> {
context.response().putHeader("Content-Type", "text/plain").end(token);
}, t -> context.fail(401));
① 建立了三個Single物件,表示不同的授權查詢。
② 當三個操作成功完成時,執行zip操作的回撥方法,使用前面三個操作的結果。
9.5 使用資料庫連結
為了從池中獲取一個數據庫連結,所有你需要做的就是呼叫JDBCClient上的rxGetConnection方法:
Single<SQLConnection> connection = dbClient.rxGetConnection();
這個方法返回了一個Single,你可以輕易使用flatMap變換來執行SQL查詢:
Single<ResultSet> resultSet = dbClient.rxQueryWithParams( sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));
但是,如果SQLConnection引用不再可達,我們怎麼釋放該連結?一個簡單而且方便的方法是當Single取消訂閱時執行close:
private Single<SQLConnection> getConnection() {
return dbClient.rxGetConnection().flatMap(conn -> {
Single<SQLConnection> connectionSingle = Single.just(conn); ①
return connectionSingle.doOnUnsubscribe(conn::close); ②
});
}
① 在獲取連結之後,我們將其封裝為一個Single。
② Single修改為當取消訂閱時,呼叫close。
現在我們可以在資料庫Verticle中任何需要執行SQL查詢的時候使用getConnection。
9.6 消除回撥和RxJava之間的差距
有時,你可能必須混合RxJava程式碼和基於回撥的API。例如,服務代理介面只能定義為回撥的方式,但是它的實現使用了Vert.x Rxified API。
這種情況下,io.vertx.rx.java.RxHelper類可以適配Handler
@Override
public WikiDatabaseService fetchAllPagesData(Handler<AsyncResult<List<JsonObject>>> resultHandler) { ①
dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES_DATA))
.map(ResultSet::getRows)
.subscribe(RxHelper.toSubscriber(resultHandler)); ②
return this;
}
① fetchAllPagesData是一個非同步服務代理操作,其定義使用了Handler
9.7 資料流
RxJava不僅是合併不同事件來源的偉大工具,它對於資料流也非常有幫助。不像Vert.x future或者JDK future,Observable發出一個事件流,而不僅是一個單獨的事件,並且它擁有一個廣泛的資料操作運算集。
我們可以使用這些操作中一些來重構資料庫Verticle中的fetchAllPages方法:
public WikiDatabaseService fetchAllPages(Handler<AsyncResult<JsonArray>> resultHandler) {
dbClient.rxQuery(sqlQueries.get(SqlQuery.ALL_PAGES))
.flatMapObservable(res -> { ①
List<JsonArray> results = res.getResults();
return Observable.from(results); ②
})
.map(json->json.getString(0)) ③
.sorted() ④
.collect(JsonArray::new, JsonArray::add) ⑤
.subscribe(RxHelper.toSubscriber(resultHandler));
return this;
}
① 通過flatMapObservable,我們可以使用Single發出的條目建立一個Observable。
② from將資料庫results迭代轉換成一個Observable,該Observable發出資料庫行條目。
③ 由於我們只需要頁面名稱,我們可以map每個JsonObject行到首列。
④ 客戶端希望資料按照字母表順序sorted。
⑤ 事件匯流排服務應答包含在一個單獨的JsonArray中。collect方法通過JsonArray::new建立一個新的物件,然後當條目發出時通過JsonArray::add方法新增它們。