實戰基於Spring Boot 2的WebFlux和mLab搭建反應式Web
Spring Framework 5帶來了新的Reactive Stack非阻塞式Web框架:Spring WebFlux。作為與Spring MVC並行使用的Web框架,Spring WebFlux依賴了反應式流介面卡(Reactive Streams Adapter),在Netty和Servlet3.1的容器下,可以提供非阻塞式的Web服務,充分發揮下一代多核處理器的優勢,支撐海量的併發訪問。
以上是官網的介紹,事實上在基於Spring Boot 2強大的微服務架構幫助下,WebFlux和Spring MVC一起,成為Java應用開發的兩大選擇,可以讓我們迅速地搭建起反應式的Web應用。本文擬通過模擬一個簡單的微博應用,實戰通過Spring Boot 2+ Spring WebFlux + MongoDB 開發一個Web應用。
Spring WebFlux及其程式設計正規化
Spring WebFlux通過核心庫Reactor提供反應式支援,Reactor實現了Reactive Streams,後者是一個帶非阻塞式背壓的非同步流處理器。
Reactor包含兩個重要的成員變數Flux和Mono,它們都實現了Reactive Streams提供的
Publisher介面
. Flux
是一個代表了0..N元素的流,Mono
是代表了一個0..1元素的流。雖然WebFlux使用Reactor作為它的核心依賴,它在應用層面,它也同時支援RxJava。
Spring WebFlux支援兩種型別的程式設計正規化:
-
傳統的基於註解的方式,如@Controller、@RequestMapping等沿用了Spring MVC的模式.
-
基於Java8的Lambda函數語言程式設計模式
本文主要是使用基於註解的方式,今後另文補充基於函數語言程式設計的正規化。
基於Spring Boot 2+ Spring WebFlux + MongoDB的輕量級微博應用
以下展示如何搭建一個輕量級的微博應用,這個應用只包括一個domain類Tweet,使用基於MongoDB的線上MongoDB資料庫mLab作為儲存,並且使用非同步的RESTful API提供基本的增刪查改功能。
此外還會用到Spring Test元件,通過使用Maven的外掛功能,實現對微服務應用的測試。
1. 新建專案
- 選擇2.x以上的Spring Boot版本
- 輸入artifact的值,比如webflux-demo
- 選擇Reactive Web和Reactive MongoDB依賴
- 點選Generate Project,生成並下載一個微服務框架到本地,並解壓
- 使用IDE,比如eclipse,匯入解壓出來的專案檔案
2. 註冊mLab賬戶,並新建一個MongoDB資料庫
MongoDB資料庫是常用的文件型別資料庫,廣泛用於社交網站、電商等引用中。而mLab是一個線上MongoDB資料庫平臺,提供MongoDB的線上服務。這個應用使用到它。
- 根據要求註冊賬戶
- 網站會有免費和收費的服務選擇,選擇AWS的免費MongoDB服務
- 服務選擇完畢,平臺會提供一個數據庫映象,可以點選資料庫前往管理頁面。
- 在User標籤下,新建資料庫的登入名和密碼。
完成以上步驟,資料庫就可以開始使用了。你會看到如下圖所示的頁面:
3. 在專案中配置MongoDB資料庫
前往IDE中的專案資原始檔夾,找到application.properties。新增你在mLad的MongoDB URI
spring.data.mongodb.uri=mongodb://username:[email protected]:63439/springdb
在應用啟動的時候,Springboot會自動讀取該配置檔案。
4. 編寫應用各模組
WebFlux可以認為是基於Spring的Web開發的一個新的模式或選擇,因此它既有Spring MVC有的模組如Domain、Controller、Service,也有新增的如Handler、Router等。下面分別編寫各模組。
4.1 Domain包
Domain包只包括一個domain類Tweet.java,因為使用了文件資料庫,因此使用@Document註解修飾類,並且使用@Id修飾成員變數id。程式碼如下:
1 package com.example.webfluxdemo.model; 2 3 import java.util.Date; 4 5 import javax.validation.constraints.NotBlank; 6 import javax.validation.constraints.NotNull; 7 import javax.validation.constraints.Size; 8 9 import org.springframework.data.annotation.Id; 10 import org.springframework.data.mongodb.core.mapping.Document; 11 12 @Document(collection = "tweets") 13 public class Tweet { 14 @Id 15 private String id; 16 17 @NotBlank 18 @Size(max = 140) 19 private String text; 20 21 @NotNull 22 private Date createAt = new Date(); 23 24 public Tweet() { 25 26 } 27 28 public Tweet(String text) { 29 this.text = text; 30 } 31 32 public String getId() { 33 return id; 34 } 35 36 public void setId(String id) { 37 this.id = id; 38 } 39 40 public String getText() { 41 return text; 42 } 43 44 public void setText(String text) { 45 this.text = text; 46 } 47 48 public Date getCreateAt() { 49 return createAt; 50 } 51 52 public void setCreateAt(Date createAt) { 53 this.createAt = createAt; 54 } 55 56 }
4.2 Repository
Repository介面是DAO,繼承了ReactiveMongoRepository介面用於連線MongoDB資料庫做資料持久化,
1 package com.example.webfluxdemo.repository; 2 3 import org.springframework.data.mongodb.repository.ReactiveMongoRepository; 4 import org.springframework.stereotype.Repository; 5 6 import com.example.webfluxdemo.model.Tweet; 7 8 @Repository 9 public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> { 10 11 }
通過檢視原始碼可知,父介面ReactiveMongoRepository包含對MongoDB資料庫基本的增刪改查方法。在執行時,Spring Boot會自動實現一個SimpleReactiveMongoRepository類,用於執行增刪改查方法。這樣極大地節省了程式設計師持久化的精力,可以專注於業務開發。
4.3 Controller
Controller是WebFlux的核心類,程式碼如下:
1 package com.example.webfluxdemo.controller; 2 3 import javax.validation.Valid; 4 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.http.HttpStatus; 7 import org.springframework.http.MediaType; 8 import org.springframework.http.ResponseEntity; 9 import org.springframework.web.bind.annotation.DeleteMapping; 10 import org.springframework.web.bind.annotation.GetMapping; 11 import org.springframework.web.bind.annotation.PathVariable; 12 import org.springframework.web.bind.annotation.PostMapping; 13 import org.springframework.web.bind.annotation.PutMapping; 14 import org.springframework.web.bind.annotation.RequestBody; 15 import org.springframework.web.bind.annotation.RestController; 16 17 import com.example.webfluxdemo.model.Tweet; 18 import com.example.webfluxdemo.repository.TweetRepository; 19 20 import reactor.core.publisher.Flux; 21 import reactor.core.publisher.Mono; 22 23 @RestController 24 public class TweetController { 25 26 @Autowired 27 private TweetRepository tweetRepository; 28 29 @GetMapping("/tweets") 30 public Flux<Tweet> getAllTweets(){ 31 return tweetRepository.findAll(); 32 } 33 34 @PostMapping("/tweets") 35 public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet){ 36 return tweetRepository.save(tweet); 37 } 38 39 @GetMapping("/tweets/{id}") 40 public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) { 41 return tweetRepository.findById(tweetId) 42 .map(savedTweet -> ResponseEntity.ok(savedTweet)) 43 .defaultIfEmpty(ResponseEntity.notFound().build()); 44 } 45 46 @PutMapping("/tweets/{id}") 47 public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId, 48 @Valid @RequestBody Tweet tweet) { 49 return tweetRepository.findById(tweetId) 50 .flatMap(existingTweet -> { 51 existingTweet.setText(tweet.getText()); 52 return tweetRepository.save(existingTweet); 53 }) 54 .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK)) 55 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); 56 } 57 58 @DeleteMapping("/tweets/{id}") 59 public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) { 60 61 return tweetRepository.findById(tweetId) 62 .flatMap(existingTweet -> 63 tweetRepository.delete(existingTweet) 64 .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))) 65 ) 66 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND)); 67 } 68 69 // 基於反應式流傳送微博至客戶端 70 @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 71 public Flux<Tweet> streamAllTweets() { 72 return tweetRepository.findAll(); 73 } 74 75 }
Controller使用Flux或Mono作為物件,返回給不同的請求。反應式編碼主要在最後一個方法:
// 基於反應式流傳送微博至客戶端 @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Tweet> streamAllTweets() { return tweetRepository.findAll(); }
這個方法和getAllTweet方法一樣,會返回一個JSON流到客戶端,區別在於streamAllTweets以Server-send-event的方式返回一個Json流到瀏覽器,這種流可以被瀏覽器識別和使用。
使用WebTestClient測試應用
WebTestClient是Spring 5提供的一個非同步反應式Http客戶端,可以用於測試反應式的RestFul微服務應用。在IDE的測試資料夾中,可以找到測試類,編寫程式碼如下:
1 package com.example.webfluxdemo; 2 3 import java.util.Collections; 4 5 import org.assertj.core.api.Assertions; 6 import org.junit.Test; 7 import org.junit.runner.RunWith; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.boot.test.context.SpringBootTest; 10 import org.springframework.http.MediaType; 11 import org.springframework.test.context.junit4.SpringRunner; 12 import org.springframework.test.web.reactive.server.WebTestClient; 13 14 import com.example.webfluxdemo.model.Tweet; 15 import com.example.webfluxdemo.repository.TweetRepository; 16 17 import reactor.core.publisher.Mono; 18 19 @RunWith(SpringRunner.class) 20 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) 21 public class WebfluxDemoApplicationTests { 22 23 @Autowired 24 private WebTestClient webTestClient; 25 26 @Autowired 27 TweetRepository tweetRepository; 28 29 @Test 30 public void testCreateTweet() { 31 Tweet tweet = new Tweet("這是一條測試微博"); 32 33 webTestClient.post().uri("/tweets") 34 .contentType(MediaType.APPLICATION_JSON_UTF8) 35 .accept(MediaType.APPLICATION_JSON_UTF8) 36 .body(Mono.just(tweet), Tweet.class) 37 .exchange() 38 .expectStatus().isOk() 39 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 40 .expectBody() 41 .jsonPath("$.id").isNotEmpty() 42 .jsonPath("$.text").isEqualTo("這是一條測試微博"); 43 } 44 45 @Test 46 public void testGetAllTweets() { 47 webTestClient.get().uri("/tweets") 48 .accept(MediaType.APPLICATION_JSON_UTF8) 49 .exchange() 50 .expectStatus().isOk() 51 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 52 .expectBodyList(Tweet.class); 53 } 54 55 @Test 56 public void testGetSingleTweet() { 57 Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block(); 58 59 webTestClient.get() 60 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 61 .exchange() 62 .expectStatus().isOk() 63 .expectBody() 64 .consumeWith(response -> 65 Assertions.assertThat(response.getResponseBody()).isNotNull()); 66 } 67 68 @Test 69 public void testUpdateTweet() { 70 Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block(); 71 72 Tweet newTweetData = new Tweet("更新微博"); 73 74 webTestClient.put() 75 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 76 .contentType(MediaType.APPLICATION_JSON_UTF8) 77 .accept(MediaType.APPLICATION_JSON_UTF8) 78 .body(Mono.just(newTweetData), Tweet.class) 79 .exchange() 80 .expectStatus().isOk() 81 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) 82 .expectBody() 83 .jsonPath("$.text").isEqualTo("更新微博"); 84 } 85 86 @Test 87 public void testDeleteTweet() { 88 Tweet tweet = tweetRepository.save(new Tweet("將要被刪除的微博")).block(); 89 90 webTestClient.delete() 91 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId())) 92 .exchange() 93 .expectStatus().isOk(); 94 } 95 96 }
使用mvn test命令,測試所有的測試類。結果如下:
檢視mLab的資料庫,資料被成功新增: