什麼是響應式程式設計,Java 如何實現
我們這裡用通過唯一 id 獲取知乎的某個回答作為例子,首先我們先明確下,一次HTTP請求到伺服器上處理完之後,將響應寫回這次請求的連線,就是完成這次請求了,如下:
public void request(Connection connection, HttpRequest request) {
//處理request,省略程式碼
connection.write(response);//完成響應
}
假設獲取回答需要呼叫兩個介面,獲取評論數量還有獲取回答資訊,傳統的程式碼可能會這麼去寫:
//獲取評論數量 public void getCommentCount(Connection connection, HttpRequest request) { Integer commentCount = null; try { //從快取獲取評論數量,阻塞IO commentCount = getCommnetCountFromCache(id); } catch(Exception e) { try { //快取獲取失敗就從資料庫中獲取,阻塞IO commentCount = getVoteCountFromDB(id); } catch(Exception ex) { } } connection.write(commentCount); } //獲取回答 public void getAnswer(Connection connection, HttpRequest request) { //獲取點贊數量 Integer voteCount = null; try { //從快取獲取點贊數量,阻塞IO voteCount = getVoteCountFromCache(id); } catch(Exception e) { try { //快取獲取失敗就從資料庫中獲取,阻塞IO voteCount = getVoteCountFromDB(id); } catch(Exception ex) { } } //從資料庫獲取回答資訊,阻塞IO Answer answer = getAnswerFromDB(id); //拼裝Response ResultVO response = new ResultVO(); if (voteCount != null) { response.setVoteCount(voteCount); } if (answer != null) { response.setAnswer(answer); } connection.write(response);//完成響應 }
在這種實現下,你的程序只需要一個執行緒池,承載了所有請求。這種實現下,有兩個弊端:
- 執行緒池 IO 阻塞,導致某個儲存變慢或者快取擊穿的話,所有服務都堵住了。假設現在評論快取突然掛了,全都訪問資料庫,導致請求變慢。由於執行緒需要等待 IO 響應,導致唯一一個執行緒池被堆滿,無法處理獲取回答的請求。
- 對於獲取回答資訊,獲取點贊數量其實和獲取回答資訊是可以併發進行的。不用非得先獲取點贊數量之後再獲取回答資訊。
現在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我們可以通過響應式程式設計,來讓我們的執行緒不會阻塞,而是一直在處理請求。這是如何實現的呢?
傳統的 BIO,是執行緒將資料寫入 Connection 之後,當前執行緒進入 Block 狀態,直到響應返回,之後接著做響應返回後的動作。NIO 則是執行緒將資料寫入 Connection 之後,將響應返回後需要做的事情以及引數快取到一個地方
那麼,怎樣實現快取響應返回後需要做的事情以及引數的呢?Java 本身提供了兩種介面,一個是基於回撥的 Callback 介面(Java 8 引入的各種Functional Interface),一種是 Future 框架。
基於 Callback 的實現:
//獲取回答 public void getAnswer(Connection connection, HttpRequest request) { ResultVO resultVO = new ResultVO(); getVoteCountFromCache(id, (count, throwable) -> { //異常不為null則為獲取失敗 if (throwable != null) { //讀取快取失敗就從資料庫獲取 getVoteCountFromDB(id, (count2, throwable2) -> { if (throwable2 == null) { resultVO.setVoteCount(voteCount); } //從資料庫讀取回答資訊 getAnswerFromDB(id, (answer, throwable3) -> { if (throwable3 == null) { resultVO.setAnswer(answer); connection.write(resultVO); } else { connection.write(throwable3); } }); }); } else { //獲取成功,設定voteCount resultVO.setVoteCount(voteCount); //從資料庫讀取回答資訊 getAnswerFromDB(id, (answer, throwable2) -> { if (throwable2 == null) { resultVO.setAnswer(answer); //返回響應 connection.write(resultVO); } else { //返回錯誤響應 connection.write(throwable2); } }); } }); }
可以看出,隨著呼叫層級的加深,callback 層級越來越深,越來越難寫,而且囉嗦的程式碼很多。並且,基於 CallBack 想實現獲取點贊數量其實和獲取回答資訊併發是很難寫的,這裡還是先獲取點贊數量之後再獲取回答資訊。
那麼基於 Future 呢?我們用 Java 8 之後引入的 CompletableFuture
來試著實現下。
//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
ResultVO resultVO = new ResultVO();
//所有的非同步任務都執行完之後要做的事情
CompletableFuture.allOf(
getVoteCountFromCache(id)
//發生異常,從資料庫讀取
.exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
//讀取完之後,設定VoteCount
.thenAccept(voteCount -> {
resultVO.setVoteCount(voteCount);
}),
getAnswerFromDB(id).thenAccept(answer -> {
resultVO.setAnswer(answer);
})
).exceptionallyAsync(throwable -> {
connection.write(throwable);
}).thenRun(() -> {
connection.write(resultVO);
});
}
這種實現就看上去簡單多了,並且讀取點贊數量還有讀取回答內容是同時進行的。
Project Reactor 在 Completableuture 這種實現的基礎上,增加了更多的組合方式以及更完善的異常處理機制,以及面對背壓時候的處理機制,還有重試機制。
每日一刷,輕鬆提升技術,斬獲各種offer: