WebFlux響應式程式設計基礎之 5 webflux服務端開發講解
Spring5 非組塞的開發模式
SpringMvc 與 SpringWebFlux 對比
學習工作機制 工作思想 更加重要
Netty 很重要 讀一下 Netty原始碼
先垂直擴充套件 –》 後水平擴充套件
5-2 非同步servlet
問題: 1同步servlet阻塞了什麼?
答案: 阻塞了Tomcat容器的servlet執行緒
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class SyncServlet
*/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SyncServlet() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 執行業務程式碼
doSomeThing(request, response);
System.out.println("sync use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(HttpServletRequest request,
HttpServletResponse response) throws IOException {
// 模擬耗時操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
//
response.getWriter().append("done");
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
}
}
問題: 2非同步servlet是怎麼樣工作的呢?
答案: 執行緒池,另外一個執行緒去處理耗時的操作
答案: 同步和非同步對於瀏覽器都是一樣的,耗時是一樣的,同步非同步僅僅是對於後臺來說的
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class AsyncServlet
*/
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public AsyncServlet() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 開啟非同步
AsyncContext asyncContext = request.startAsync();
// 執行業務程式碼
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
asyncContext.getRequest(), asyncContext.getResponse()));
System.out.println("async use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(AsyncContext asyncContext,
ServletRequest servletRequest, ServletResponse servletResponse) {
// 模擬耗時操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
//
try {
servletResponse.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 業務程式碼處理完畢, 通知結束
asyncContext.complete();
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
}
}
5-3 webflux開發-1
5-5 server-sent events
SSE H5
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class SSE
*/
@WebServlet("/SSE")
public class SSE extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SSE() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
for (int i = 0; i < 5; i++) {
// 指定事件標識
response.getWriter().write("event:me\n");
// 格式: data: + 資料 + 2個回車
response.getWriter().write("data:" + i + "\n\n");
response.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
}
}
可以去看官方文件,官方文件是正解,畢竟輪子是別人創造的
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
<script type="text/javascript">
// 初始化, 引數為url
// 依賴H5
var sse = new EventSource("SSE");
sse.onmessage = function(e) {
console.log("message", e.data, e);
}
// 監聽指定事件, (就不會進入onmessage了)
sse.addEventListener("me", function(e) {
console.log("me event", e.data);
// 如果不關閉,會自動重連
if (e.data == 3) {
sse.close();
}
});
</script>
</body>
</html>
SSE 與WebSocket的區別
**5-6 完整例子**
**MongoDB 的資料儲存格式 **
![這裡寫圖片描述](https://img-blog.csdn.net/20180803093140534?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
import javax.validation.constraints.NotBlank;
import org.hibernate.validator.constraints.Range;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
// MongoDB的表
@Document(collection = “user”)
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
@Range(min=10, max=100)
private int age;
}
package com.imooc.controller;
import javax.validation.Valid;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.imooc.domain.User;
import com.imooc.repository.UserRepository;
import com.imooc.util.CheckUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(“/user”)
public class UserController {
private final UserRepository repository;
public UserController(UserRepository repository) {
this.repository = repository;
}
/**
* 以陣列形式一次性返回資料
*
* @return
*/
@GetMapping("/")
public Flux<User> getAll() {
return repository.findAll();
}
/**
* 以SSE形式多次返回資料
*
* @return
*/
@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll() {
return repository.findAll();
}
/**
* 新增資料
*
* @param user
* @return
*/
@PostMapping("/")
public Mono<User> createUser(@Valid @RequestBody User user) {
// spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增
// 根據實際情況是否置空id
user.setId(null);
CheckUtil.checkName(user.getName());
return this.repository.save(user);
}
/**
* 根據id刪除使用者 存在的時候返回200, 不存在返回404
*
* @param id
* @return
*/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable("id") String id) {
// deletebyID 沒有返回值, 不能判斷資料是否存在
// this.repository.deleteById(id)
return this.repository.findById(id)
// 當你要操作資料, 並返回一個Mono 這個時候使用flatMap
// 如果不操作資料, 只是轉換資料, 使用map
.flatMap(user -> this.repository.delete(user).then(
Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 修改資料 存在的時候返回200 和修改後的資料, 不存在的時候返回404
*
* @param id
* @param user
* @return
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
@Valid @RequestBody User user) {
CheckUtil.checkName(user.getName());
return this.repository.findById(id)
// flatMap 操作資料
.flatMap(u -> {
u.setAge(user.getAge());
u.setName(user.getName());
return this.repository.save(u);
})
// map: 轉換資料
.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根據ID查詢使用者 存在返回使用者資訊, 不存在返回404
*
* @param id
* @return
*/
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> findUserById(
@PathVariable("id") String id) {
return this.repository.findById(id)
.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根據年齡查詢使用者
*
* @param start
* @param end
* @return
*/
@GetMapping("/age/{start}/{end}")
public Flux<User> findByAge(@PathVariable("start") int start,
@PathVariable("end") int end) {
return this.repository.findByAgeBetween(start, end);
}
/**
* 根據年齡查詢使用者
*
* @param start
* @param end
* @return
*/
@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamFindByAge(@PathVariable("start") int start,
@PathVariable("end") int end) {
return this.repository.findByAgeBetween(start, end);
}
/**
* 得到20-30使用者
* @return
*/
@GetMapping("/old")
public Flux<User> oldUser() {
return this.repository.oldUser();
}
/**
* 得到20-30使用者
*
* @return
*/
@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamOldUser() {
return this.repository.oldUser();
}
}
![這裡寫圖片描述](https://img-blog.csdn.net/20180803093809636?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
**Spring官方網站推薦用構造方法注入 但是也不方便啊 新增一個 減少一個 怎麼辦呢?**
![這裡寫圖片描述](https://img-blog.csdn.net/20180803093626270?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
**5-9 完整例子-引數校驗**
[原始碼連結,請點選我](https://github.com/linliangxuan/SpringBoot2.0-WebFlux-/tree/master)
spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增
**5-10 RouterFunction模式-1**
![這裡寫圖片描述](https://img-blog.csdn.net/20180803145043925?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
![這裡寫圖片描述](https://img-blog.csdn.net/20180803145110341?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RndXRsaWFuZ3h1YW4=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
package com.imooc.handlers;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.imooc.domain.User;
import com.imooc.repository.UserRepository;
import com.imooc.util.CheckUtil;
import reactor.core.publisher.Mono;
@Component
public class UserHandler {
private final UserRepository repository;
public UserHandler(UserRepository rep) {
this.repository = rep;
}
/**
* 得到所有使用者
*
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request) {
return ok().contentType(APPLICATION_JSON_UTF8)
.body(this.repository.findAll(), User.class);
}
/**
* 建立使用者
*
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request) {
// 2.0.0 是可以工作, 但是2.0.1 下面這個模式是會報異常
Mono<User> user = request.bodyToMono(User.class);
return user.flatMap(u -> {
// 校驗程式碼需要放在這裡
CheckUtil.checkName(u.getName());
return ok().contentType(APPLICATION_JSON_UTF8)
.body(this.repository.save(u), User.class);
});
}
/**
* 根據id刪除使用者
*
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request) {
String id = request.pathVariable("id");
return this.repository.findById(id)
.flatMap(
user -> this.repository.delete(user).then(ok().build()))
.switchIfEmpty(notFound().build());
}
}
package com.imooc.routers;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.imooc.handlers.UserHandler;
@Configuration
public class AllRouters {
@Bean
RouterFunction<ServerResponse> userRouter(UserHandler handler) {
return nest(
// 相當於類上面的 @RequestMapping("/user")
path("/user"),
// 下面的相當於類裡面的 @RequestMapping
// 得到所有使用者
route(GET("/"), handler::getAllUser)
// 建立使用者
.andRoute(POST("/").and(accept(MediaType.APPLICATION_JSON_UTF8)),
handler::createUser)
// 刪除使用者
.andRoute(DELETE("/{id}"), handler::deleteUserById));
}
}
“`
5-11 RouterFunction模式-2