1. 程式人生 > >WebFlux響應式程式設計基礎之 5 webflux服務端開發講解

WebFlux響應式程式設計基礎之 5 webflux服務端開發講解

Spring5 非組塞的開發模式

SpringMvc 與 SpringWebFlux 對比
學習工作機制 工作思想 更加重要
這裡寫圖片描述
Netty 很重要 讀一下 Netty原始碼

先垂直擴充套件 –》 後水平擴充套件

5-2 非同步servlet

這裡寫圖片描述

問題: 1同步servlet阻塞了什麼?
答案: 阻塞了Tomcat容器的servlet執行緒


import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import
javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class SyncServlet */ @WebServlet("/SyncServlet") public class SyncServlet extends HttpServlet { private
static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public SyncServlet() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { long t1 = System.currentTimeMillis(); // 執行業務程式碼 doSomeThing(request, response); System.out.println("sync use:" + (System.currentTimeMillis() - t1)); } private void doSomeThing(HttpServletRequest request, HttpServletResponse response) throws IOException { // 模擬耗時操作 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } // response.getWriter().append("done"); } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub doGet(request, response); } }

問題: 2非同步servlet是怎麼樣工作的呢?
答案: 執行緒池,另外一個執行緒去處理耗時的操作
答案: 同步和非同步對於瀏覽器都是一樣的,耗時是一樣的,同步非同步僅僅是對於後臺來說的


import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class AsyncServlet
 */
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public AsyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 開啟非同步
        AsyncContext asyncContext = request.startAsync();

        // 執行業務程式碼
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(AsyncContext asyncContext,
            ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 業務程式碼處理完畢, 通知結束
        asyncContext.complete();
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}

5-3 webflux開發-1
這裡寫圖片描述

5-5 server-sent events
SSE H5
這裡寫圖片描述


import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class SSE
 */
@WebServlet("/SSE")
public class SSE extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SSE() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        for (int i = 0; i < 5; i++) {
            // 指定事件標識
            response.getWriter().write("event:me\n");
            // 格式: data: + 資料 + 2個回車
            response.getWriter().write("data:" + i + "\n\n");
            response.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        }

    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}

可以去看官方文件,官方文件是正解,畢竟輪子是別人創造的

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>

    <script type="text/javascript">
        // 初始化, 引數為url
        // 依賴H5
        var sse = new EventSource("SSE");

        sse.onmessage = function(e) {
            console.log("message", e.data, e);
        }

        // 監聽指定事件, (就不會進入onmessage了)
        sse.addEventListener("me", function(e) {
            console.log("me event", e.data);
            // 如果不關閉,會自動重連
            if (e.data == 3) {
                sse.close();
            }
        });
    </script>
</body>
</html>

SSE 與WebSocket的區別

**5-6 完整例子**
**MongoDB 的資料儲存格式  **
![這裡寫圖片描述](https://img-blog.csdn.net/20180803093140534?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

import javax.validation.constraints.NotBlank;
import org.hibernate.validator.constraints.Range;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
// MongoDB的表
@Document(collection = “user”)
@Data
public class User {

@Id
private String id;

@NotBlank
private String name;

@Range(min=10, max=100)
private int age;

}

package com.imooc.controller;

import javax.validation.Valid;

import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.imooc.domain.User;
import com.imooc.repository.UserRepository;
import com.imooc.util.CheckUtil;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(“/user”)
public class UserController {

private final UserRepository repository;

public UserController(UserRepository repository) {
    this.repository = repository;
}

/**
 * 以陣列形式一次性返回資料
 * 
 * @return
 */
@GetMapping("/")
public Flux<User> getAll() {
    return repository.findAll();
}

/**
 * 以SSE形式多次返回資料
 * 
 * @return
 */
@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll() {
    return repository.findAll();
}

/**
 * 新增資料
 * 
 * @param user
 * @return
 */
@PostMapping("/")
public Mono<User> createUser(@Valid @RequestBody User user) {
    // spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增
    // 根據實際情況是否置空id
    user.setId(null);
    CheckUtil.checkName(user.getName());
    return this.repository.save(user);
}

/**
 * 根據id刪除使用者 存在的時候返回200, 不存在返回404
 * 
 * @param id
 * @return
 */
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable("id") String id) {
    // deletebyID 沒有返回值, 不能判斷資料是否存在
    // this.repository.deleteById(id)
    return this.repository.findById(id)
            // 當你要操作資料, 並返回一個Mono 這個時候使用flatMap
            // 如果不操作資料, 只是轉換資料, 使用map
            .flatMap(user -> this.repository.delete(user).then(
                    Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
            .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}

/**
 * 修改資料 存在的時候返回200 和修改後的資料, 不存在的時候返回404
 * 
 * @param id
 * @param user
 * @return
 */
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
        @Valid @RequestBody User user) {
    CheckUtil.checkName(user.getName());
    return this.repository.findById(id)
            // flatMap 操作資料
            .flatMap(u -> {
                u.setAge(user.getAge());
                u.setName(user.getName());
                return this.repository.save(u);
            })
            // map: 轉換資料
            .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
            .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}

/**
 * 根據ID查詢使用者 存在返回使用者資訊, 不存在返回404
 * 
 * @param id
 * @return
 */
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> findUserById(
        @PathVariable("id") String id) {
    return this.repository.findById(id)
            .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
            .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}

/**
 * 根據年齡查詢使用者
 * 
 * @param start
 * @param end
 * @return
 */
@GetMapping("/age/{start}/{end}")
public Flux<User> findByAge(@PathVariable("start") int start,
        @PathVariable("end") int end) {
    return this.repository.findByAgeBetween(start, end);
}

/**
 * 根據年齡查詢使用者
 * 
 * @param start
 * @param end
 * @return
 */
@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamFindByAge(@PathVariable("start") int start,
        @PathVariable("end") int end) {
    return this.repository.findByAgeBetween(start, end);
}

/**
 *  得到20-30使用者
 * @return
 */
@GetMapping("/old")
public Flux<User> oldUser() {
    return this.repository.oldUser();
}

/**
 * 得到20-30使用者
 * 
 * @return
 */
@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamOldUser() {
    return this.repository.oldUser();
}

}

![這裡寫圖片描述](https://img-blog.csdn.net/20180803093809636?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
**Spring官方網站推薦用構造方法注入  但是也不方便啊 新增一個 減少一個 怎麼辦呢?**
![這裡寫圖片描述](https://img-blog.csdn.net/20180803093626270?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

**5-9 完整例子-引數校驗**
[原始碼連結,請點選我](https://github.com/linliangxuan/SpringBoot2.0-WebFlux-/tree/master)

 spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增

**5-10 RouterFunction模式-1**
![這裡寫圖片描述](https://img-blog.csdn.net/20180803145043925?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
![這裡寫圖片描述](https://img-blog.csdn.net/20180803145110341?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

package com.imooc.handlers;

import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.imooc.domain.User;
import com.imooc.repository.UserRepository;
import com.imooc.util.CheckUtil;

import reactor.core.publisher.Mono;

@Component
public class UserHandler {

private final UserRepository repository;

public UserHandler(UserRepository rep) {
    this.repository = rep;
}

/**
 * 得到所有使用者
 * 
 * @param request
 * @return
 */
public Mono<ServerResponse> getAllUser(ServerRequest request) {
    return ok().contentType(APPLICATION_JSON_UTF8)
            .body(this.repository.findAll(), User.class);
}

/**
 * 建立使用者
 * 
 * @param request
 * @return
 */
public Mono<ServerResponse> createUser(ServerRequest request) {
    // 2.0.0 是可以工作, 但是2.0.1 下面這個模式是會報異常
    Mono<User> user = request.bodyToMono(User.class);

    return user.flatMap(u -> {
        // 校驗程式碼需要放在這裡
        CheckUtil.checkName(u.getName());

        return ok().contentType(APPLICATION_JSON_UTF8)
                .body(this.repository.save(u), User.class);
    });
}

/**
 * 根據id刪除使用者
 * 
 * @param request
 * @return
 */
public Mono<ServerResponse> deleteUserById(ServerRequest request) {
    String id = request.pathVariable("id");

    return this.repository.findById(id)
            .flatMap(
                    user -> this.repository.delete(user).then(ok().build()))
            .switchIfEmpty(notFound().build());
}

}

package com.imooc.routers;

import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.imooc.handlers.UserHandler;

@Configuration
public class AllRouters {

@Bean
RouterFunction<ServerResponse> userRouter(UserHandler handler) {
    return nest(
            // 相當於類上面的 @RequestMapping("/user")
            path("/user"),
            // 下面的相當於類裡面的 @RequestMapping
            // 得到所有使用者
            route(GET("/"), handler::getAllUser)
            // 建立使用者
            .andRoute(POST("/").and(accept(MediaType.APPLICATION_JSON_UTF8)),
                            handler::createUser)
            // 刪除使用者
            .andRoute(DELETE("/{id}"), handler::deleteUserById));
}

}

“`

5-11 RouterFunction模式-2