1. 程式人生 > >Spring船新版推出的WebFlux,是兄弟就來學我

Spring船新版推出的WebFlux,是兄弟就來學我

watermark stat ren 開啟 lan append 做的 result ror

初識SpringWebFlux

Spring WebFlux是Spring Framework 5.0中引入的新的響應式Web框架。 與Spring MVC不同,它不需要Servlet API,完全異步和非阻塞, 並通過Reactor項目實現Reactive Streams規範,所以性能更高。 並且可以在諸如Netty,Undertow和Servlet 3.1+容器的服務器上運行。

Spring WebFlux特性:

  1. 異步非阻塞:

    眾所周知,SpringMVC是同步阻塞的IO模型,資源浪費相對來說比較嚴重,當我們在處理一個比較耗時的任務時,例如:上傳一個比較大的文件,首先,服務器的線程一直在等待接收文件,在這期間它就像個傻子一樣等在那,什麽都幹不了,好不容易等到文件來了並且接收完畢,我們又要將文件寫入磁盤,在這寫入的過程中,這根線程又再次懵bi了,又要等到文件寫完才能去幹其它的事情。這一前一後的等待,不浪費資源麽?

    沒錯,Spring WebFlux就是來解決這問題的,Spring WebFlux可以做到異步非阻塞。還是上面那上傳文件的例子,Spring WebFlux是這樣做的:線程發現文件還沒準備好,就先去做其它事情,當文件準備好之後,通知這根線程來處理,當接收完畢寫入磁盤的時候(根據具體情況選擇是否做異步非阻塞),寫入完畢後通知這根線程再來處理(異步非阻塞情況下)。相對SpringMVC而言,可以節省系統資源以及支持更高的並發量。

  2. 響應式(reactive)函數編程:

    Spring WebFlux支持函數式編程,得益於對於reactive-stream的支持(通過reactor框架來實現的)

  3. 不再拘束於Servlet容器:

    以前,我們的應用都運行於Servlet容器之中,例如我們大家最為熟悉的Tomcat, Jetty...等等。而現在Spring WebFlux不僅能運行於傳統的Servlet容器中(前提是容器要支持Servlet3.1,因為非阻塞IO是使用了Servlet3.1的特性),還能運行在支持NIO的Netty和Undertow中。

Spring WebFlux與Spring MVC的對比圖:
技術分享圖片

Spring WebFlux支持兩種編程方式:
技術分享圖片


異步servlet

在學習webflux之前,我們首先要學習一下異步的servlet。我們需要了解同步servlet阻塞了什麽?為什麽需要異步servlet?異步servlet能支持高吞吐量的原理是什麽?

servlet容器(如tomcat)裏面,每處理一個請求會占用一個線程,同步servlet裏面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會全部用完,就無法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!

而異步serlvet裏面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其他請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可以使用少量的線程處理更加高的請求,從而實現高吞吐量!

我們來看一個同步Servlet的示例代碼:

package org.example.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: 同步的Servlet Demo
 * @author: 01
 * @create: 2018-10-04 17:02
 **/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long timeMillis = System.currentTimeMillis();

        // 執行業務代碼
        doSometing(req, resp);

        System.out.println("sync use: " + (System.currentTimeMillis() - timeMillis));
    }

    private void doSometing(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        resp.getWriter().append("done");
    }
}

運行結果如下:

sync use: 5000

從運行結果可以看到,業務代碼花了5 秒,但servlet容器的線程幾乎沒有任何耗時。而如果是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。

然後我們來看一下異步Servlet的示例代碼:

package org.example.servlet;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: 異步的Servlet Demo
 * @author: 01
 * @create: 2018-10-04 17:16
 **/
@WebServlet(asyncSupported = true, urlPatterns = "/AsyncServlet")
public class AsyncServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long timeMillis = System.currentTimeMillis();

        // 1.開啟異步上下文
        AsyncContext asyncContext = req.startAsync();

        // 2.異步執行業務代碼,放到另一個線程去處理
        CompletableFuture.runAsync(() -> doSometing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use: " + (System.currentTimeMillis() - timeMillis));
    }

    private void doSometing(AsyncContext asyncContext, ServletRequest req, ServletResponse resp) {
        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
            resp.getWriter().append("done");
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }

        // 3.業務代碼處理完畢,通知請求結束
        asyncContext.complete();
    }
}

運行結果如下:

async use: 8

可以看到,異步的Servlet不會阻塞Tomcat的線程,異步Servlet可以把耗時的操作交給另一個線程去處理,從而使得Tomcat的線程能夠繼續接收下一個請求。這就是異步Servlet的工作方式,得益於非阻塞的特性,能夠大大提高服務器的吞吐量。


Webflux開發

了解了同步的Servlet和異步Servlet之間的區別以及異步Servlet的工作方式之後,我們就可以開始嘗試使用一下Spring的webflux了。

創建一個Spring Boot工程,選擇如下依賴:
技術分享圖片

關於reactor:

spring webflux是基於reactor來實現響應式的。那麽reactor是什麽呢?我是這樣理解的 reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。reactor裏面Flux和Mono就是stream,它的最終操作就是 subscribe/block 2種。

Reactor中的Mono和Flux:

Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。 在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。 當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列。 該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉換。 對一個 Flux 序列進行計數操作,得到的結果是一個 Mono對象。把兩個 Mono 序列合並在一起,得到的是一個 Flux 對象。了解更多>>

我們來看一段代碼,理解一下reactor的概念:

package org.example.spring.webflux;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

/**
 * @program: webflux
 * @description: Reactor Demo
 * @author: 01
 * @create: 2018-10-04 17:58
 **/
public class ReactorDemo {

    public static void main(String[] args) {
        // Mono 0-1個元素
        // Flux 0-N 個元素
        String[] strings = {"1", "2", "3"};

        // 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關系, 需要用它來給發布者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發布者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }

        };

        // reactor = jdk8 stream + jdk9 reactive stream
        // 這裏就是jdk8的stream
        Flux.fromArray(strings).map(Integer::parseInt)
                // 最終操作,這裏就是jdk9的reactive stream
                .subscribe(subscriber);
    }
}

在以上例子中,我們可以像JDK9那樣實現訂閱者,並且直接就可以用在reactor的subscribe方法上。調用了subscribe方法就相當於調用了stream的最終操作。有了 reactor = jdk8 stream + jdk9 reactive stream 概念後,在掌握了jdk8的stream和jkd9的flow之後,reactor也不難掌握。

如果對 jdk8 stream 和 jdk9 reactive stream不了解的話,可以參考我另外兩篇文章:

  • Java函數式編程之Stream流編程
  • JDK9特性-Reactive Stream 響應式流

了解了reactor的概念後,我們來編寫一段測試代碼,對比一下webflux的兩種開發方式:

package org.example.spring.webflux.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

/**
 * @program: webflux
 * @description: webflux demo
 * @author: 01
 * @create: 2018-10-04 17:47
 **/
@Slf4j
@RestController
public class TestController {

    /**
     * 傳統的 spring mvc 開發方式
     */
    @GetMapping("/mvc")
    public String mvc() {
        long timeMillis = System.currentTimeMillis();
        log.info("mvc() start");
        String result = createStr();
        log.info("mvc() end use time {}/ms", System.currentTimeMillis() - timeMillis);

        return result;
    }

    /**
     * spring webflux 的開發方式
     */
    @GetMapping("/webflux")
    public Mono<String> webflux() {
        long timeMillis = System.currentTimeMillis();
        log.info("webflux() start");
        Mono<String> result = Mono.fromSupplier(this::createStr);
        log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis);

        return result;
    }

    private String createStr() {
        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "some string";
    }
}

訪問/mvc,控制臺輸出日誌如下:
技術分享圖片

訪問/webflux,控制臺輸出日誌如下:
技術分享圖片

以上的例子中,只演示了reactor 裏的mono操作,返回了0-1個元素。以下示例則簡單演示了flux操作,返回0-N個元素,代碼如下:

/**
 * 使用flux,像流一樣返回0-N個元素
 */
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> flux() {
    long timeMillis = System.currentTimeMillis();
    log.info("webflux() start");
    Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "flux data--" + i;
    }));
    log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis);

    return result;
}

訪問/flux接口後,控制臺輸出日誌如下:
技術分享圖片

在瀏覽器上會每隔一秒接收一行數據:
技術分享圖片


SSE(Server-Sent Events)

在上一小節的例子中我們使用flux返回數據時,可以多次返回數據(其實和響應式沒有關系),實際上使用的技術就是H5的SSE。我們學習一個技術,API的使用只是最初級也是最簡單的,更加重要的是需要知其然並知其所以然,否則就只能死記硬背不用就忘!我們不滿足在spring裏面能實現sse效果,更加需要知道spring是如何做到的。

其實SSE很簡單,我們花一點點時間就可以掌握,我們在純servlet環境裏面實現。如下示例:

package org.example.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: SSE Demo
 * @author: 01
 * @create: 2018-10-04 19:37
 **/
@WebServlet("/ServerSentEventsServlet")
public class ServerSentEventsServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // 設置返回的數據類型及字符編碼
        resp.setContentType("text/event-stream");
        resp.setCharacterEncoding("UTF-8");

        for (int i = 0; i < 5; i++) {
            // 自定義事件標識(非必須)
            resp.getWriter().write("event:me\n");

            // 需特定格式:data: + 數據 + 2個回車符
            resp.getWriter().write("data:" + i + "\n\n");
            resp.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

其中最為關鍵的是 ContentType 需為 "text/event-stream",然後返回的數據符合固定的要求格式即可。

使用瀏覽器訪問如下:
技術分享圖片

如果前端需要進行一些處理的話,我們也可以編寫js代碼來獲取數據,如下示例:

<script type="text/javascript">
    // 初始化sse,參數為url
    var sse = new EventSource("ServerSentEventsServlet");

    // 無自定義事件標識時,通過onmessage事件獲取返回的數據
    sse.onmessage = function (evt) {
        console.log("message", evt.data, evt)
    };

    // 若有自定義的事件標識時,通過添加事件監聽獲取返回的數據
    sse.addEventListener("me", function (evt) {
        console.log("message", evt.data);
        if (evt.data === 3) {
            // 關閉sse
            sse.close()
        }
    });
</script>

Spring船新版推出的WebFlux,是兄弟就來學我