Sprint Boot如何基於Redis釋出訂閱實現非同步訊息系統的同步呼叫?
前言
在很多網際網路應用系統中,請求處理非同步化是提升系統效能一種常用的手段,而基於訊息系統的非同步處理由於具備高可靠性、高吞吐量的特點,因而在併發請求量比較高的網際網路系統中被廣泛應用。與此同時,這種方案也帶來了呼叫鏈路處理上的問題,因為大部分應用請求都會要求同步響應實時處理結果,而由於請求的處理過程已經通過訊息非同步解耦,所以整個呼叫鏈路就變成了非同步鏈路,此時請求鏈路的發起者如何同步拿到響應結果,就需要進行額外的系統設計考慮。
為了更清晰地理解這個問題,小碼哥以最近正在做的共享單車的IOT系統為例,給大家來一張圖描述下,如圖所示:
在上述系統流程中,終端裝置與服務端之間通過MQTT協議
鎖裝置在收到MQTT開鎖訊息後,會通過嵌入式軟體系統觸發硬體裝置完成開鎖動作,之後就需要通過MQTT上行訊息
現在的問題是通過MQTT協議的開鎖下行訊息、上行訊息已經完全處於兩條不同的非同步網路鏈路,而鏈路的發起者此時卻需要同步等待開鎖結果,但是實際上同步鏈路早已在Iot應用系統向物聯網平臺傳送開鎖訊息後就已經完成,所以為了滿足呼叫方的同步請求/響應需要
解決方案分析
以上問題在使用訊息服務進行非同步解耦的應用場景中是比較普遍的需求,由於非同步呼叫鏈路非常長所以通用的解決思路是在呼叫鏈的起始端進行同步阻塞,而在呼叫鏈的結束端通過回撥的方式來實現,如下圖所示:
在上述圖示中,鏈路起始佇列處在傳送第一次非同步訊息後會開啟一個臨時佇列並同步阻塞監聽該臨時佇列的回撥訊息,而鏈路的結束佇列在完成邏輯處理後需要回調起始佇列監聽的臨時佇列,而由於請求執行緒一直處於阻塞監聽該臨時佇列的狀態,所以一旦收到回撥訊息就可以結束阻塞執行後續流程,從而完成整個鏈路的同步響應。 雖然常見的訊息中介軟體都可以實現以上邏輯,例如小碼哥之前所在的公司就基於RabbitMQ通過臨時佇列的方式實現過訊息鏈路的同步呼叫,但是基於訊息中介軟體的方式多少還是顯得有些繁瑣,對於常見的訊息中介軟體如RocketMQ、RabbitMQ來說非同步訊息才是其強項,如果以大量臨時佇列的建立和銷燬為代價來實現訊息呼叫鏈路的同步,不僅從使用上來說顯得有些麻煩,並且也會對訊息中介軟體的穩定性帶來一些不好的影響。
因此在前面提到的IOT系統中,我們採用了基於Redis的釋出/訂閱功能來實現非同步訊息鏈路的同步化呼叫。而由於Redis的高效能以及Redis的應用場景非常豐富,並且非常適合資料頻繁變動的場景,在系統中既可以作為NoSQL資料庫來使用,同時還支援分散式鎖等功能,因而維護的價效比也相對較高。接下來我們就基於Spring Boot的開發框架來演示如何利用Redis的釋出/訂閱來實現非同步訊息鏈路的同步回撥!
Redis釋出訂閱機制
Redis本身可以通過釋出訂閱機制實現一定的訊息佇列功能,在Redis中通過subscribe/publish等命令可以實現釋出訂閱功能,基於此原先的IOT系統處理示意圖如下:
如上圖所示,在IOT應用端傳送非同步MQTT訊息後會以訊息ID組成的Key作為頻道****,並保持請求執行緒對該頻道的同步監聽,直到收到Iot業務訊息佇列的開鎖結果上行訊息後,在訊息佇列的消費端將該上行訊息釋出至同樣以訊息requestId組成的頻道中,從而實現基於Redis釋出訂閱機制的非同步訊息系統同步呼叫效果。
Spring Boot程式碼實現
下面我們基於Spring Boot演示如何通過程式碼進行實現,建立Spring Boot工程後引入Spring Boot Redis整合依賴包,如下:
<!--Redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
複製程式碼
之後在專案的配置檔案中新增Redis服務連線資訊,如下所示:
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
複製程式碼
此時專案就具備了訪問Redis的能力,接下來我們通過具體的程式碼實現來進行功能演示。訂閱監聽程式碼如下:
@RestController
@RequestMapping("/iot")
public class IotController {
//注入Redis訊息容器物件
@Autowired
RedisMessageListenerContainer redisMessageListenerContainer;
@RequestMapping(value = "/unLock",method = RequestMethod.POST)
public boolean unLock(@RequestParam(value = "thingName") String thingName,@RequestParam(value = "requestId") String requestId)
throws InterruptedException,ExecutionException,TimeoutException {
//此處實現非同步訊息呼叫處理....
//生成監聽頻道Key
String key = "IOT_" + thingName + "_" + requestId;
//建立監聽Topic
ChannelTopic channelTopic = new ChannelTopic(key);
//建立訊息任務物件
IotMessageTask iotMessageTask = new IotMessageTask();
//任務物件及監聽Topic新增到訊息監聽容器
try {
redisMessageListenerContainer.addMessageListener(new IotMessageListener(iotMessageTask),channelTopic);
System.out.println("start redis subscribe listener->" + key);
//進入同步阻塞等待,超時時間設定為60秒
Message message = (Message) iotMessageTask.getIotMessageFuture().get(60000,TimeUnit.MILLISECONDS);
System.out.println("receive redis callback message->" + message.toString());
} finally {
//銷燬訊息監聽物件
if (iotMessageTask != null) {
redisMessageListenerContainer.removeMessageListener(iotMessageTask.getMessageListener());
}
}
return true;
}
}
複製程式碼
在上述程式碼中,我們模擬了一個開鎖請求,在完成非同步訊息處理後會開啟Redis訂閱監聽,為了實現非同步阻塞還需要我們建立訊息任務物件,程式碼如下:
public class IotMessageTask<T> {
//宣告執行緒非同步阻塞物件(JDK 1.8新提供Api)
private CompletableFuture<T> iotMessageFuture = new CompletableFuture<>();
//宣告訊息監聽物件
private MessageListener messageListener;
//宣告超時時間
private boolean isTimeout;
public IotMessageTask() {
}
public CompletableFuture<T> getIotMessageFuture() {
return iotMessageFuture;
}
public void setIotMessageFuture(CompletableFuture<T> iotMessageFuture) {
this.iotMessageFuture = iotMessageFuture;
}
public MessageListener getMessageListener() {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
public boolean isTimeout() {
return isTimeout;
}
public void setTimeout(boolean timeout) {
isTimeout = timeout;
}
}
複製程式碼
在訊息任務物件中我們通過JDK1.8新提供的CompletableFuture類實現執行緒阻塞效果,並通過定義訊息監聽物件及超時時間完善處理機制。此外根據Controller層程式碼還需要自定義定義訊息監聽處理物件,程式碼如下:
public class IotMessageListener implements MessageListener {
IotMessageTask iotMessageTask;
public IotMessageListener(IotMessageTask iotMessageTask) {
this.iotMessageTask = iotMessageTask;
}
//實現訊息釋出監聽處理方法
@Override
public void onMessage(Message message,byte[] bytes) {
System.out.println("subscribe redis iot task response:{}" + message.toString());
//執行緒阻塞完成
iotMessageTask.getIotMessageFuture().complete(message);
}
}
複製程式碼
此時就完成了Redis服務訂閱這部分邏輯的編寫,在後續的邏輯處理中需要完成訊息的釋出才能正常結束此處的阻塞等待,接下來我們寫一段程式碼來模擬訊息釋出,程式碼如下:
@RestController
@RequestMapping("/iot")
public class IotCallBackController {
//引入Redis客戶端操作物件
@Autowired
StringRedisTemplate stringRedisTemplate;
@RequestMapping(value = "/unLockCallBack",method = RequestMethod.POST)
public boolean unLockCallBack(@RequestParam(value = "thingName") String thingName,@RequestParam(value = "requestId") String requestId) {
//生成監聽頻道Key
String key = "IOT_" + thingName + "_" + requestId;
//模擬實現訊息回撥
stringRedisTemplate.convertAndSend(key,"this is a redis callback");
return true;
}
}
複製程式碼
此時啟動Spring Boot應用呼叫開鎖模擬介面,邏輯就會暫時處於訂閱等待狀態;之後再模擬呼叫開鎖回撥Redis訊息釋出邏輯,之前的阻塞等待就會因為監聽回撥而完成同步返回。