1. 程式人生 > 其它 >什麼是響應式程式設計,Java 如何實現

什麼是響應式程式設計,Java 如何實現

技術標籤:每日一面java

我們這裡用通過唯一 id 獲取知乎的某個回答作為例子,首先我們先明確下,一次HTTP請求到伺服器上處理完之後,將響應寫回這次請求的連線,就是完成這次請求了,如下:

public void request(Connection connection, HttpRequest request) {
    //處理request,省略程式碼
    connection.write(response);//完成響應
}

假設獲取回答需要呼叫兩個介面,獲取評論數量還有獲取回答資訊,傳統的程式碼可能會這麼去寫:

//獲取評論數量
public void getCommentCount(Connection connection, HttpRequest request) {
    Integer commentCount = null;
    try {
        //從快取獲取評論數量,阻塞IO
        commentCount = getCommnetCountFromCache(id);
    } catch(Exception e) {
        try {
            //快取獲取失敗就從資料庫中獲取,阻塞IO
            commentCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
    
        }
    }
    connection.write(commentCount);
}

//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
    //獲取點贊數量
    Integer voteCount = null;
    try {
        //從快取獲取點贊數量,阻塞IO
        voteCount = getVoteCountFromCache(id);
    } catch(Exception e) {
        try {
            //快取獲取失敗就從資料庫中獲取,阻塞IO
            voteCount = getVoteCountFromDB(id);
        } catch(Exception ex) {
    
        }
    }
    //從資料庫獲取回答資訊,阻塞IO
    Answer answer = getAnswerFromDB(id);
    //拼裝Response
    ResultVO response = new ResultVO();
    if (voteCount != null) {
        response.setVoteCount(voteCount);
    }
    if (answer != null) {
        response.setAnswer(answer);
    }
    connection.write(response);//完成響應
}

在這種實現下,你的程序只需要一個執行緒池,承載了所有請求。這種實現下,有兩個弊端:

  1. 執行緒池 IO 阻塞,導致某個儲存變慢或者快取擊穿的話,所有服務都堵住了。假設現在評論快取突然掛了,全都訪問資料庫,導致請求變慢。由於執行緒需要等待 IO 響應,導致唯一一個執行緒池被堆滿,無法處理獲取回答的請求。
  2. 對於獲取回答資訊,獲取點贊數量其實和獲取回答資訊是可以併發進行的。不用非得先獲取點贊數量之後再獲取回答資訊。

現在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我們可以通過響應式程式設計,來讓我們的執行緒不會阻塞,而是一直在處理請求。這是如何實現的呢?

傳統的 BIO,是執行緒將資料寫入 Connection 之後,當前執行緒進入 Block 狀態,直到響應返回,之後接著做響應返回後的動作。NIO 則是執行緒將資料寫入 Connection 之後,將響應返回後需要做的事情以及引數快取到一個地方

之後,直接返回。在有響應返回後,NIO 的 Selector 的 Read 事件會是 Ready 狀態,掃描 Selector 事件的執行緒,會告訴你的執行緒池資料好了,然後執行緒池中的某個執行緒,拿出剛剛快取的要做的事情還有引數,繼續處理。

那麼,怎樣實現快取響應返回後需要做的事情以及引數的呢?Java 本身提供了兩種介面,一個是基於回撥的 Callback 介面(Java 8 引入的各種Functional Interface),一種是 Future 框架。

基於 Callback 的實現:

//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
    getVoteCountFromCache(id, (count, throwable) -> {
        //異常不為null則為獲取失敗
        if (throwable != null) {
            //讀取快取失敗就從資料庫獲取
            getVoteCountFromDB(id, (count2, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setVoteCount(voteCount);
                }
                //從資料庫讀取回答資訊
                getAnswerFromDB(id, (answer, throwable3) -> {
                    if (throwable3 == null) {
                        resultVO.setAnswer(answer);
                        connection.write(resultVO);
                    } else {
                        connection.write(throwable3);
                    }
                });
            });
        } else {
            //獲取成功,設定voteCount
            resultVO.setVoteCount(voteCount);
            //從資料庫讀取回答資訊
            getAnswerFromDB(id, (answer, throwable2) -> {
                if (throwable2 == null) {
                    resultVO.setAnswer(answer);
                    //返回響應
                    connection.write(resultVO);
                } else {
                    //返回錯誤響應
                    connection.write(throwable2);
                }
            });
        }
    });
}

可以看出,隨著呼叫層級的加深,callback 層級越來越深,越來越難寫,而且囉嗦的程式碼很多。並且,基於 CallBack 想實現獲取點贊數量其實和獲取回答資訊併發是很難寫的,這裡還是先獲取點贊數量之後再獲取回答資訊。

那麼基於 Future 呢?我們用 Java 8 之後引入的 CompletableFuture 來試著實現下。

//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
    ResultVO resultVO = new ResultVO();
        //所有的非同步任務都執行完之後要做的事情
        CompletableFuture.allOf(
                getVoteCountFromCache(id)
                        //發生異常,從資料庫讀取
                        .exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
                        //讀取完之後,設定VoteCount
                        .thenAccept(voteCount -> {
                    resultVO.setVoteCount(voteCount);
                }),
                getAnswerFromDB(id).thenAccept(answer -> {
                    resultVO.setAnswer(answer);
                })
        ).exceptionallyAsync(throwable -> {
            connection.write(throwable);
        }).thenRun(() -> {
            connection.write(resultVO);
        });
}

這種實現就看上去簡單多了,並且讀取點贊數量還有讀取回答內容是同時進行的。
Project Reactor 在 Completableuture 這種實現的基礎上,增加了更多的組合方式以及更完善的異常處理機制,以及面對背壓時候的處理機制,還有重試機制

每日一刷,輕鬆提升技術,斬獲各種offer:

image