Redis 釋出訂閱,小功能大用處,真沒那麼廢材!
阿新 • • 發佈:2020-09-23
![jae-park-7GX5aICb5i4-unsplash](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071551245-480949487.jpg)
今天小黑哥來跟大家介紹一下 Redis 釋出/訂閱功能。
也許有的小夥伴對這個功能比較陌生,不太清楚這個功能是幹什麼的,沒關係小黑哥先來舉個例子。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071551671-950298652.jpg)
假設我們有這麼一個業務場景,在網站下單支付以後,需要通知庫存服務進行發貨處理。
上面業務實現不難,我們只要讓庫存服務提供給相關的給口,下單支付之後只要呼叫庫存服務即可。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071551810-1889612249.jpg)
後面如果又有新的業務,比如說積分服務,他需要獲取下單支付的結果,然後增加使用者的積分。
這個實現也不難,讓積分服務同樣提供一個介面,下單支付之後只要呼叫庫存服務即可。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071551948-1456777281.jpg)
如果就兩個業務需要獲取下單支付的結果,那也還好,程式改造也快。可是隨著業務不斷的發展,越來越多的新業務說是要下單支付的結果。
這時我們會發現上面這樣的系統架構存在很多問題:
第一,下單支付業務與其他業務重度耦合,每當有個新業務需要支付結果,就需要改動下單支付的業務。
第二,如果呼叫業務過多,會導致下單支付介面響應時間變長。另外,如果有任一下游介面響應變慢,就會同步導致下單支付介面響應也變長。
第三,如果任一下游介面失敗,可能導致資料不一致的情況。比如說下圖,先呼叫 A,成功之後再呼叫 B,最後再呼叫 C。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552081-326942269.jpg)
如果在呼叫 B 介面的發生異常,此時可能就導致下單支付介面返回失敗,但是此時 A 介面其實已經呼叫成功,這就代表它內部已經處理下單支付成功的結果。
這樣就會導致 A,B,C 三個下游介面,A 獲取成功獲取支付結果,但是 B,C 沒有拿到,導致三者系統資料不一致的情況。
其實我們仔細想一下,對於下單支付業務來講,它其實不需要關心下游呼叫結果,只要有某種機制通知能通知到他們就可以了。
講到這裡,這就需要引入今天需要介紹釋出訂閱機制。
## Redis 釋出與訂閱
Redis 提供了基於「釋出/訂閱」模式的訊息機制,在這種模式下,訊息釋出者與訂閱者不需要進行直接通訊。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552234-861298090.jpg)
如上圖所示,訊息釋出者只需要想指定的頻道釋出訊息,訂閱該頻道的每個客戶端都可以接受到到這個訊息。
使用 Redis 釋出訂閱這種機制,對於上面業務,下單支付業務只需要向**支付結果**這個頻道傳送訊息,其他下游業務訂閱**支付結果**這個頻道,就能收相應訊息,然後做出業務處理即可。
這樣就可以解耦系統上下游之間呼叫關係。
接下來我們來看下,我們來看下如何使用 Redis 釋出訂閱功能。
Redis 中提供了一組命令,可以用於釋出訊息,訂閱頻道,取消訂閱以及按照模式訂閱。
首先我們來看下如何釋出一條訊息,其實很簡單隻要使用 **publish** 指令:
```shell
publish channel message
```
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552359-110330798.jpg)
上圖中,我們使用 **publish** 指令向 **pay_result** 這個頻道傳送了一條訊息。我們可以看到 redis 向我們返回 0 ,這其實代表當前訂閱者個數,由於此時沒有訂閱,所以返回結果為 0 。
接下來我們使用 **subscribe** 訂閱一個或多個頻道
```shell
subscribe channel [channel ...]
```
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552527-1553825283.jpg)
如上圖所示,我們訂閱 **pay_result** 這個頻道,當有其他客戶端往這個頻道傳送訊息,
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552708-1128781229.jpg)
當前訂閱者就會收到訊息。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071552825-1554354110.jpg)
我們子在使用訂閱命令,需要主要幾點:
第一,客戶端執行訂閱指令之後,就會進入訂閱狀態,之後就只能接收 **subscribe**、**psubscribe**、**unsubscribe**、**punsubscribe** 這四個命令。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071553029-877652412.jpg)
第二,新訂閱的客戶端,是**無法收到這個頻道之前的訊息**,這是因為 Redis 並不會對釋出的訊息持久化的。
> 相比於很多專業 MQ,比如 kafka、rocketmq 來說, redis 釋出訂閱功能就顯得有點簡陋了。不過 redis 釋出訂閱功能勝在簡單,如果當前場景可以容忍這些缺點,還是可以選擇使用的。
除了上面的功能以外的,Redis 還支援模式匹配的訂閱方式。簡單來說,客戶端可以訂閱一個帶 `*` 號的模式,如果某些頻道的名字與這個模式匹配,那麼當其他客戶端傳送給訊息給這些頻道時,訂閱這個模式的客戶端也將會到收到訊息。
使用 Redis 訂閱模式,我們需要使用一個新的指令 **psubscribe**。
我們執行下面這個指令:
```
psubscribe pay.*
```
那麼一旦有其他客戶端往 **pay** 開頭的頻道,比如 `pay_result`、`pay_xxx`,我們都可以收到訊息。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071553200-1935642593.jpg)
如果需要取消訂閱模式,我們需要使用相應`punsubscribe` 指令,比如取消上面訂閱的模式:
```
punsubscribe pay.*
```
## Redis 客戶端釋出訂閱使用方式
### 基於 Jedis 開發釋出/訂閱
聊完 Redis 釋出訂閱指令,我們來看下 Java Redis 客戶端如何使用釋出訂閱。
> 下面的例子主要基於 Jedis,maven 版本為:
>
> ```xml
>
> redis.clients
> jedis
> 3.1.0
>
> ```
>
> 其他 Redis 客戶端大同小異。
jedis 釋出程式碼比較簡單,只需要呼叫 `Jedis` 類的 `publish` 方法。
```java
// 生產環境千萬不要這麼使用哦,推薦使用 JedisPool 執行緒池的方式
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");
```
訂閱的程式碼就相對複雜了,我們需要繼承 `JedisPubSub `實現裡面的相關方法,一旦有其他客戶端往訂閱的頻道上傳送訊息,將會呼叫 `JedisPubSub ` 相應的方法。
```java
private static class MyListener extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("收到訂閱頻道:" + channel + " 訊息:" + message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("收到具體訂閱頻道:" + channel + "訂閱模式:" + pattern + " 訊息:" + message);
}
}
```
其次我們需要呼叫 `Jedis` 類的 `subscribe` 方法:
```java
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");
```
當有其他客戶端往 `pay_result`頻道傳送訊息時,訂閱將會收到訊息。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071553346-126324633.jpg)
不過需要注意的是,`jedis#subscribe` 是一個阻塞方法,呼叫之後將會阻塞主執行緒的,所以如果需要在正式專案使用需要使用非同步執行緒執行,這裡就不演示具體的程式碼了。
### 基於 Spring-Data-Redis 開發釋出訂閱
原生 jedis 釋出訂閱操作,相對來說還是有點複雜。現在我們很多應用已經基於 SpringBoot 開發,使用 `spring-boot-starter-data-redis` ,可以簡化釋出訂閱開發。
首先我們需要引入相應的 startter 依賴:
```xml
org.springframework.boot
spring-boot-starter-data-redis
lettuce-core
io.lettuce
redis.clients
jedis
```
> 這裡我們使用 Jedis 當做底層連線客戶端,所以需要排除 lettuce,然後引入 Jedis 依賴。
然後我們需要建立一個訊息接收類,裡面需要有方法消費訊息:
```java
@Slf4j
public class Receiver {
private AtomicInteger counter = new AtomicInteger();
public void receiveMessage(String message) {
log.info("Received <" + message + ">");
counter.incrementAndGet();
}
public int getCount() {
return counter.get();
}
}
```
接著我們只需要注入 Spring- Redis 相關 Bean,比如:
- `StringRedisTemplate`,用來操作 Redis 命令
- `MessageListenerAdapter` ,訊息監聽器,可以在這個類注入我們上面建立訊息接受類` Receiver`
- `RedisConnectionFactory`, 建立 Redis 底層連線
```java
@Configuration
public class MessageConfiguration {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 訂閱指定頻道使用 ChannelTopic
// 訂閱模式使用 PatternTopic
container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
// 注入 Receiver,指定類中的接受方法
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
```
最後我們使用 `StringRedisTemplate#convertAndSend` 傳送訊息,同時 `Receiver` 將會收到一條訊息。
```java
@SpringBootApplication
public class MessagingRedisApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);
StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
Receiver receiver = ctx.getBean(Receiver.class);
while (receiver.getCount() == 0) {
template.convertAndSend("pay_result", "Hello from Redis!");
Thread.sleep(500L);
}
System.exit(0);
}
}
```
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071553490-360728283.jpg)
## Redis 釋出訂閱實際應用
### Redis Sentinel 節點發現
**Redis Sentinel** 是 Redis 一套高可用方案,可以在主節點故障的時候,自動將從節點提升為主節點,從而轉移故障。
今天這裡我們不詳細解釋 **Redis Sentinel** 詳細原理,主要來看下 **Redis Sentinel** 如何使用釋出訂閱機制。
**Redis Sentinel** 節點主要使用釋出訂閱機制,實現新節點的發現,以及交換主節點的之間的狀態。
如下所示,每一個 **Sentinel** 節點將會定時向 `_sentinel_:hello` 頻道傳送訊息,並且每個 **Sentinel** 都會訂閱這個節點。
![](https://img2020.cnblogs.com/other/1419561/202009/1419561-20200923071553656-852449901.jpg)
這樣一旦有節點往這個頻道傳送訊息,其他節點就可以立刻收到訊息。
這樣一旦有的新節點加入,它往這個頻道傳送訊息,其他節點收到之後,判斷本地列表並沒有這個節點,於是就可以當做新的節點加入本地節點列表。
除此之外,每次往這個頻道傳送訊息內容可以包含節點的狀態資訊,這樣可以作為後面 **Sentinel** 領導者選舉的依據。
以上都是對於 Redis 服務端來講,對於客戶端來講,我們也可以用到釋出訂閱機制。
當 **Redis Sentinel** 進行主節點故障轉移,這個過程各個階段會通過釋出訂閱對外提供。
對於我們客戶端來講,比較關心切換之後的主節點,這樣我們及時切換主節點的連線(舊節點此時已故障,不能再接受操作指令),
客戶端可以訂閱 `+switch-master`頻道,一旦 **Redis Sentinel** 結束了對主節點的故障轉移就會發布主節點的的訊息。
### redission 分散式鎖
redission 開源框架提供一些便捷操作 Redis 的方法,其中比較出名的 redission 基於 Redis 的實現分散式鎖。
今天我們來看下 Redis 的實現分散式鎖中如何使用 Redis 釋出訂閱機制,提高加鎖的效能。
> PS:redission 分散式鎖實現原理,可以參考之前寫過的文章:
>
> 1. [可重入分散式鎖的實現方式](https://mp.weixin.qq.com/s/3sJ0TfYG3tXLPwBa2AAJBg)
> 2. [Redis 分散式鎖,看似簡單,其實真不簡單](https://mp.weixin.qq.com/s/HlD46m-OP-HDdKJFxgqFYA)
首先我們來看下 redission 加鎖的方法:
```java
Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();
```
`RLock` 繼承自 Java 標準的 `Lock` 介面,呼叫 `lock` 方法,如果當前鎖已被其他客戶端獲取,那麼當前加鎖的執行緒將會被阻塞,直到其他客戶端釋放這把鎖。
這裡其實有個問題,當前阻塞的執行緒如何感知分散式鎖已被釋放呢?
這裡其實有兩種實現方法:
第一鍾,定時查詢分佈時鎖的狀態,一旦查到鎖已被釋放(*Redis 中不存在這個鍵值*),那麼就去加鎖。
實現偽碼如下:
```java
while (true) {
boolean result=lock();
if (!result) {
Thread.sleep(N);
}
}
```
這種方式實現起來起來簡單,不過缺點也比較多。
如果定時任務時間過短,將會導致查詢次數過多,其實這些都是無效查詢。
如果定時任務休眠時間過長,那又會導致加鎖時間過長,導致加鎖效能不好。
那麼第二種實現方案,就是採用服務通知的機制,當分散式鎖被釋放之後,客戶端可以收到鎖釋放的訊息,然後第一時間再去加鎖。
這個服務通知的機制我們可以使用 Redis 釋出訂閱模式。
當執行緒加鎖失敗之後,執行緒將會訂閱 `redisson_lock__channel_xxx`(xx 代表鎖的名稱) 頻道,使用非同步執行緒監聽訊息,然後利用 Java 中 `Semaphore` 使當前執行緒進入阻塞。
一旦其他客戶端進行解鎖,redission 就會往這個`redisson_lock__channel_xxx` 傳送解鎖訊息。
等非同步執行緒收到訊息,將會呼叫 `Semaphore` 釋放訊號量,從而讓當前被阻塞的執行緒喚醒去加鎖。
> ps:這裡只是簡單描述了 redission 加鎖部分原理,出於篇幅,這裡就不再訊息解析原始碼。
>
> 感興趣的小夥伴可以自己看下 redission 加鎖的原始碼。
通過釋出訂閱機制,被阻塞的執行緒可以及時被喚醒,減少無效的空轉的查詢,有效的提高的加鎖的效率。
> ps: 這種方式,效能確實提高,但是實現起來的複雜度也很高,這部分原始碼有點東西,快看暈了。
## 總結
今天我們主要介紹 Redis 釋出訂閱功能,主要對應的 Redis 命令為:
- **subscribe channel [channel ...]** 訂閱一個或多個頻道
- **unsubscribe channel** 退訂指定頻道
- **publish channel message** 傳送訊息
- **psubscribe pattern** 訂閱指定模式
- **punsubscribe pattern** 退訂指定模式
我們可以利用 Redis 釋出訂閱功能,實現的簡單 MQ 功能,實現上下游的解耦。
不過需要注意了,由於 Redis 釋出的訊息不會被持久化,這就會導致新訂閱的客戶端將不會收到歷史訊息。
所以,如果當前的業務場景不能容忍這些缺點,那還是用專業 MQ 吧。
最後介紹了兩個使用 Redis 釋出訂閱功能使用場景供大家參考。
> 歡迎關注我的公眾號:程式通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:[studyidea.cn](https://studyi