1. 程式人生 > >webflux提供響應式API,玩出不一樣的花樣

webflux提供響應式API,玩出不一樣的花樣

先說說什麼是響應式

        響應式程式設計或反應式程式設計(英語:Reactive programming)是一種面向資料流和變化傳播的程式設計正規化,直白的說就是:將變化的值通過資料流進行傳播。

WebFlux又是什麼呢

        WebFlux 模組的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。Spring webflux 有一個全新的非堵塞的函式式 Reactive Web 框架,可以用來構建非同步的、非堵塞的、事件驅動的服務,在伸縮性方面表現非常好。

        spring-webflux 模組。該模組包含對響應式 HTTP 和 WebSocket 客戶端的支援,以及對 REST,HTML 和 WebSocket 互動等程式的支援。一般來說,Spring MVC 用於同步處理,Spring Webflux 用於非同步處理。

        Spring Boot Webflux 有兩種程式設計模型實現,一種類似 Spring MVC 註解方式,另一種是基於 Reactor 的響應式方式。

實踐走起

    我在網找了下發現現在支援的DAL包有:
    spring-boot-starter-data-redis-reactive、spring-boot-starter-data-mongodb-reactive
    也許還有別的,我本意是想要spring-boot-starter-data-mysql-reactive,然而並木有。那就說下上面2個包的實踐把。

spring-boot-starter-data-redis-reactive

用到的包

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>com.github.flying-cattle</groupId>
			<artifactId>mybatis-dsc-generator</artifactId>
			<version>${mybatis-dsc-generator.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>

YMl配置

server:
  port: 8080
spring:
  application:
    name: webFlux-test
  redis:
    host: 127.0.0.1
    port: 6379
    password: pwd2020
    timeout: 5000
    lettuce:
      pool:
        max-active: 200 
        max-idle: 20 
        min-idle: 5 
        max-wait: 1000 

整合redis-reactive

        雖然包是starter,但是還是要有自己的配置才能用不然報錯如下:

Description:

Field redisTemplate in com.flying.cattle.wf.service.impl.RedisServiceImpl required a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' that could not be found.

The injection point has the following annotations:
	- @org.springframework.beans.factory.annotation.Autowired(required=true)


Action:

Consider defining a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' in your configuration.

看了下官方文件需要加上如下:

@Bean
	public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
		ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());
		return reactiveRedisTemplate;
	}

發現了麼是ReactiveRedisTemplate<String, String> 感覺就不很友好了,本來我是想宣告成ReactiveRedisTemplate<String, Serializable>,搞古了一會兒木有搞定。有那個大佬有好的方案,望指點哈

Service程式碼:

@Service
public class RedisServiceImpl implements RedisService {

	@Autowired
	private ReactiveRedisTemplate<String, String> redisTemplate;
	
	@Override
	public Mono<String> getById(String key) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.get(key);
	}

	@Override
	public Mono<String> addUser(String key,User user) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Mono<Boolean> deleteById(String key) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.delete(key);
	}

	@Override
	public Mono<String> updateById(String key,User user) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Flux<String> findAll(String key) {
		// TODO Auto-generated method stub
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.range(key, 0, -1);
	}


	@Override
	public Mono<Long> addlist(String key,List<String> list) {
		// TODO Auto-generated method stub
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.leftPushAll(key, list);
	}
	
	@Override
	public Flux<String> findUsers(String key) {
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));
	}
}

Controller程式碼

@RestController
@RequestMapping("/user")
public class UserController {
	
	public final static String USER_KEY="user";
	
	@Autowired
	private RedisService redisService;
	
	@Autowired
	private RedisGenerateId redisGenerateId;
	
	@GetMapping("/getId")
	public Long getUserId(){
		return redisGenerateId.generate(USER_KEY);
		
	}
	
	public String getKey(Long id) {
		return USER_KEY+"_"+id;
	}
	
	@GetMapping("/getById/{id}")
	public Mono<String> getUserById(@PathVariable("id")Long id){
		return redisService.getById(getKey(id));
	}
	
	@GetMapping("/add")
	public Mono<String> add(User user){
		user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("[email protected]");
		user.setPhone("13666275002");
		user.setSex(true);
		String bd="1990-01-01";
		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
		try {
			user.setBirthday(fmt.parse(bd));
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新區");
		user.setAddress("天 府大道XXd段XX號");
		user.setState("1");
		// 以上是模擬資料
		ValidationResult vr=ValidationUtils.validateEntity(user);
		if (!vr.isHasErrors()) {
			user.setId(getUserId());
			System.out.println(JSON.toJSONString(user));
			return redisService.addUser(getKey(user.getId()),user);
		}else {
			return Mono.just(vr.getFirstErrors());
		}
		
	}
	
	@GetMapping("/addlist")
	public Mono<Long> addlist(){
		List<String> list=new ArrayList<String>();
		User user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("[email protected]");
		user.setPhone("13666275002");
		user.setSex(true);
		user.setBirthday(new Date());
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新區");
		user.setAddress("天 府大道XXd段XX號");
		user.setState("1");
		//新增第一條資料
		Long id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		//新增第二條資料
		id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		//新增第三條資料
		id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		
		return redisService.addlist("list", list);
	}
	
	/**
	 *	這個就是流響應式的介面了,是一個一個的返回資料的,非同步返回 
	 *  delayElements(Duration.ofSeconds(2))這個是不要的,只是方便看效果
	 *  redis 直接就是一個一個返回,不需要produces,不知道為什麼...還木有深究。
	 */
	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<String> findAll(){
		return redisService.findAll("list").delayElements(Duration.ofSeconds(2));
	}
	
	@GetMapping("/getUsers")
	public Flux<String> findUsers() {
		// TODO Auto-generated method stub
		return redisService.findUsers(USER_KEY+"_"+"*").delayElements(Duration.ofSeconds(2));
	}
}

一個是差list資料型別,一個是匹配key查詢的,都是一個一個返回的,實際開發中去掉.delayElements(Duration.ofSeconds(2))就好

整合mongodb-reactive

需要的包,只需要在redis的基礎上下面的jar

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
		</dependency>

MongoDB就很人性化了,感覺就很友好。而且是真的starter包,配置好資料庫連線,就不需要其他配置了,直接可用

DAO

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

import com.flying.cattle.wf.entity.User;

public interface UserRepository extends ReactiveMongoRepository<User, Long>{

}

SERVICE(介面層我就不貼程式碼了)

@Service
public class MongoServiceImpl implements MongoService {
	
	@Autowired
	private UserRepository userRepository;
	
	@Override
	public Mono<User> getById(Long id) {
		// TODO Auto-generated method stub
		return userRepository.findById(id);
	}

	@Override
	public Mono<User> addUser(User user) {
		// TODO Auto-generated method stub
		return userRepository.save(user);
	}

	@Override
	public Mono<Boolean> deleteById(Long id) {
		// TODO Auto-generated method stub
		 userRepository.deleteById(id);
		 return Mono.create(userMonoSink -> userMonoSink.success());
	}

	@Override
	public Mono<User> updateById(User user) {
		// TODO Auto-generated method stub
		return userRepository.save(user);
	}

	@Override
	public Flux<User> findAllUser() {
		// TODO Auto-generated method stub
		return userRepository.findAll();
	}
}

CONTROLLER

@RestController
@RequestMapping("/usermg")
public class UserMongoController {
	
	public final static String USER_KEY="user";
	
	@Autowired
	private RedisGenerateId redisGenerateId;
	
	@Autowired
	private MongoService mongoService;
	
	@GetMapping("/getId")
	public Long getUserId(){
		return redisGenerateId.generate(USER_KEY);
	}

	@GetMapping("/add")
	public Mono<User> add(User user) {
		user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("[email protected]");
		user.setPhone("13666275002");
		user.setSex(true);
		String bd = "1990-01-01";
		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
		try {
			user.setBirthday(fmt.parse(bd));
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新區");
		user.setAddress("天 府大道XXd段XX號");
		user.setState("1");
		// 以上是模擬資料
		ValidationResult vr = ValidationUtils.validateEntity(user);
		if (!vr.isHasErrors()) {
			user.setId(getUserId());
			System.out.println(JSON.toJSONString(user));
			return mongoService.addUser(user);
		} else {
			 System.err.println(vr.getFirstErrors());
		}
		return null;
	}
	
	/**
	 *	注意這裡produces = MediaType.APPLICATION_STREAM_JSON_VALUE
	 *	如果不是application/stream+json則呼叫端無法滾動得到結果,將一直阻塞等待資料流結束或超時。
	 */
	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<User> findAll(){
		return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));
	}
}

程式碼就這些,大家要體驗這個框架,建議還是用MongoDB把,畢竟redis主要是做快取的。

給大家看下資料結構圖

 

 

原始碼地址:https://gitee.com/flying-cattle/infrastructure/tree/ma