Spring-data-redis + Lettuce 如何使用 Pipeline
關於 spring-data-redis 和 lettuce,筆者寫過不少文章:
- 這個 Redis 連線池的新監控方式針不戳~我再加一點佐料
- spring-data-redis 連線洩漏,我 TM 人傻了
- spring-data-redis 動態切換資料來源
- spring-data-redis 上百萬的 QPS 壓力太大連線失敗,我 TM 人傻了
最近,私信還有留言中,網友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通過抓包一看,並沒有生效,這個如何配置才能生效呢?
首先,在上面的文章中,我們分析過 Spring-data-redis + Lettuce 的基本原理,在這種環境下 RedisTemplate 使用的連線內部包括:
- asyncSharedConn:可以為空,如果開啟了連線共享,則不為空,預設是開啟的;所有 LettuceConnection 共享的 Redis 連線,對於每個 LettuceConnection 實際上都是同一個連線;用於執行簡單命令,因為 Netty 客戶端與 Redis 的單處理執行緒特性,共享同一個連線也是很快的。如果沒開啟連線共享,則這個欄位為空,使用 asyncDedicatedConn 執行命令。
- asyncDedicatedConn:私有連線,如果需要保持會話,執行事務,以及 Pipeline 命令,固定連線,則必須使用這個 asyncDedicatedConn 執行 Redis 命令。
execute(RedisCallback)
,流程是:
對於 executePipelined(RedisCallback)
,如果使用正確的話,會使用 asyncDedicatedConn
私有連線執行。那麼怎麼算使用正確呢?
需要使用回撥的連線進行 Redis 呼叫,不能直接使用 redisTemplate
呼叫,否則 pipeline 不生效:
Pipeline 生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { connection.get("test".getBytes()); connection.get("test2".getBytes()); return null; } });
Pipeline 不生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
redisTemplate.opsForValue().get("test");
redisTemplate.opsForValue().get("test2");
return null;
}
});
這樣我們就能使用保證 API 層正確使用 pipeline 了,但是預設配置的情況下, 底層還是沒有執行 Pipeline,這是怎麼回事呢?
Redis Pipeline 類比 Lettuce 中的 AutoFlushCommands
Redis Pipeline 是 Redis 中的 批量操作,它能將一組 Redis 命令進行組裝,通過一次傳輸給 Redis 並返回結果集,大大減少了如果命令時一條條單獨傳輸需要的 RTT 時間(包括 Redis 客戶端,Redis 服務端切換系統呼叫傳送接收資料的時間,以及網路傳輸時間)。
如果原來的命令是這麼傳送的:
Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4
那麼使用 PIPELINE 之後,命令就是類似於這麼傳送的
Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4
我們可以看出,其實它的原理,就是客戶端先將所有命令拼接在一起然後本地快取起來,之後統一發到服務端,服務端執行所有命令之後,統一響應。
Lettuce 的連線有一個 AutoFlushCommands 配置,就是指在這個連線上執行的命令,如果傳送到服務端。預設是 false,即收到一個命令就發到服務端一個。如果配置為 false,則將所有命令快取起來,手動呼叫 flushCommands 的時候,將快取的命令一起發到服務端,這樣其實就是實現了 Pipeline。
配置 Spring-data-redis + Lettuce 使用 Pipeline
Spring-data-redis 從 2.3.0 版本開始,對於 Lettuce 也相容了 Pipeline 配置,參考:
- DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
- https://github.com/spring-projects/spring-data-redis/issues/1581
我們可以這樣配置:
@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//在 LettuceConnectionFactory 這個 Bean 初始化之後,設定 PipeliningFlushPolicy 為 flushOnClose
if (bean instanceof LettuceConnectionFactory) {
LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
}
return bean;
}
};
}
我們來看下這個 PipeliningFlushPolicy 的原始碼就知道這個 flushOnClose 的意義:
public interface PipeliningFlushPolicy {
//其實就是預設的每個命令都直接發到 Redis Server
static PipeliningFlushPolicy flushEachCommand() {
return FlushEachCommand.INSTANCE;
}
//在連線關閉的時候,將命令一起發到 Redis
static PipeliningFlushPolicy flushOnClose() {
return FlushOnClose.INSTANCE;
}
//手動設定在多少條命令之後,統一發到 Redis,但是同樣的,連線關閉的時候也會發到 Redis
static PipeliningFlushPolicy buffered(int bufferSize) {
return () -> new BufferedFlushing(bufferSize);
}
}
這三個類也都實現了 PipeliningFlushState
介面:
public interface PipeliningFlushState {
//對於 executePipelined,剛開始就會呼叫 connection.openPipeline(); 開啟 pipeline,裡面會呼叫這個方法
void onOpen(StatefulConnection<?, ?> connection);
//對於 executePipelined 中的每個命令都會呼叫這個方法
void onCommand(StatefulConnection<?, ?> connection);
//在 executePipelined 的最後會呼叫 connection.closePipeline(),裡面會呼叫這個方法
void onClose(StatefulConnection<?, ?> connection);
}
預設的每個命令都直接發到 Redis Server 的實現是:其實就是方法裡什麼都不做。
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {}
@Override
public void onClose(StatefulConnection<?, ?> connection) {}
}
對於 flushOnClose:
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
//首先配置連線的 AutoFlushCommands 為 false,這樣命令就不會立刻發到 Redis
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
//收到命令時什麼都不做
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
//在 pipeline 關閉的時候傳送所有命令
connection.flushCommands();
//恢復預設配置,這樣連線如果退回連線池不會影響後續使用
connection.setAutoFlushCommands(true);
}
}
對於 buffered:
private static class BufferedFlushing implements PipeliningFlushState {
private final AtomicLong commands = new AtomicLong();
private final int flushAfter;
public BufferedFlushing(int flushAfter) {
this.flushAfter = flushAfter;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
//首先配置連線的 AutoFlushCommands 為 false,這樣命令就不會立刻發到 Redis
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
//如果命令達到指定個數,就發到 Redis
if (commands.incrementAndGet() % flushAfter == 0) {
connection.flushCommands();
}
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
//在 pipeline 關閉的時候傳送所有命令
connection.flushCommands();
//恢復預設配置,這樣連線如果退回連線池不會影響後續使用
connection.setAutoFlushCommands(true);
}
}
微信搜尋“我的程式設計喵”關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer: