1. 程式人生 > 其它 >Spring-data-redis + Lettuce 如何使用 Pipeline

Spring-data-redis + Lettuce 如何使用 Pipeline

關於 spring-data-redis 和 lettuce,筆者寫過不少文章:

最近,私信還有留言中,網友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通過抓包一看,並沒有生效,這個如何配置才能生效呢?

首先,在上面的文章中,我們分析過 Spring-data-redis + Lettuce 的基本原理,在這種環境下 RedisTemplate 使用的連線內部包括:

  • asyncSharedConn:可以為空,如果開啟了連線共享,則不為空,預設是開啟的;所有 LettuceConnection 共享的 Redis 連線,對於每個 LettuceConnection 實際上都是同一個連線;用於執行簡單命令,因為 Netty 客戶端與 Redis 的單處理執行緒特性,共享同一個連線也是很快的。如果沒開啟連線共享,則這個欄位為空,使用 asyncDedicatedConn 執行命令。
  • asyncDedicatedConn:私有連線,如果需要保持會話,執行事務,以及 Pipeline 命令,固定連線,則必須使用這個 asyncDedicatedConn 執行 Redis 命令。

execute(RedisCallback),流程是:

對於 executePipelined(RedisCallback),如果使用正確的話,會使用 asyncDedicatedConn 私有連線執行。那麼怎麼算使用正確呢?

需要使用回撥的連線進行 Redis 呼叫,不能直接使用 redisTemplate 呼叫,否則 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});

這樣我們就能使用保證 API 層正確使用 pipeline 了,但是預設配置的情況下, 底層還是沒有執行 Pipeline,這是怎麼回事呢?

Redis Pipeline 類比 Lettuce 中的 AutoFlushCommands

Redis Pipeline 是 Redis 中的 批量操作,它能將一組 Redis 命令進行組裝,通過一次傳輸給 Redis 並返回結果集,大大減少了如果命令時一條條單獨傳輸需要的 RTT 時間(包括 Redis 客戶端,Redis 服務端切換系統呼叫傳送接收資料的時間,以及網路傳輸時間)。

如果原來的命令是這麼傳送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4

那麼使用 PIPELINE 之後,命令就是類似於這麼傳送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4

我們可以看出,其實它的原理,就是客戶端先將所有命令拼接在一起然後本地快取起來,之後統一發到服務端,服務端執行所有命令之後,統一響應。

Lettuce 的連線有一個 AutoFlushCommands 配置,就是指在這個連線上執行的命令,如果傳送到服務端。預設是 false,即收到一個命令就發到服務端一個。如果配置為 false,則將所有命令快取起來,手動呼叫 flushCommands 的時候,將快取的命令一起發到服務端,這樣其實就是實現了 Pipeline。

配置 Spring-data-redis + Lettuce 使用 Pipeline

Spring-data-redis 從 2.3.0 版本開始,對於 Lettuce 也相容了 Pipeline 配置,參考:

我們可以這樣配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //在 LettuceConnectionFactory 這個 Bean 初始化之後,設定 PipeliningFlushPolicy 為 flushOnClose
            if (bean instanceof LettuceConnectionFactory) {
                LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
                lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
            }
            return bean;
        }
    };
}

我們來看下這個 PipeliningFlushPolicy 的原始碼就知道這個 flushOnClose 的意義:

public interface PipeliningFlushPolicy {
    //其實就是預設的每個命令都直接發到 Redis Server
    static PipeliningFlushPolicy flushEachCommand() {
		return FlushEachCommand.INSTANCE;
	}
	//在連線關閉的時候,將命令一起發到 Redis
	static PipeliningFlushPolicy flushOnClose() {
		return FlushOnClose.INSTANCE;
	}
	//手動設定在多少條命令之後,統一發到 Redis,但是同樣的,連線關閉的時候也會發到 Redis
	static PipeliningFlushPolicy buffered(int bufferSize) {
		return () -> new BufferedFlushing(bufferSize);
	}
}

這三個類也都實現了 PipeliningFlushState 介面:

public interface PipeliningFlushState {
    //對於 executePipelined,剛開始就會呼叫 connection.openPipeline(); 開啟 pipeline,裡面會呼叫這個方法
    void onOpen(StatefulConnection<?, ?> connection);
    //對於 executePipelined 中的每個命令都會呼叫這個方法
    void onCommand(StatefulConnection<?, ?> connection);
    //在 executePipelined 的最後會呼叫 connection.closePipeline(),裡面會呼叫這個方法
    void onClose(StatefulConnection<?, ?> connection);
}

預設的每個命令都直接發到 Redis Server 的實現是:其實就是方法裡什麼都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
	INSTANCE;
	@Override
	public PipeliningFlushState newPipeline() {
		return INSTANCE;
	}
	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {}
	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {}
	@Override
	public void onClose(StatefulConnection<?, ?> connection) {}
}

對於 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
	INSTANCE;
	@Override
	public PipeliningFlushState newPipeline() {
		return INSTANCE;
	}
	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {
	    //首先配置連線的 AutoFlushCommands 為 false,這樣命令就不會立刻發到 Redis
		connection.setAutoFlushCommands(false);
	}
	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {
        //收到命令時什麼都不做
	}
	@Override
	public void onClose(StatefulConnection<?, ?> connection) {
	    //在 pipeline 關閉的時候傳送所有命令
		connection.flushCommands();
		//恢復預設配置,這樣連線如果退回連線池不會影響後續使用
		connection.setAutoFlushCommands(true);
	}
}

對於 buffered:

private static class BufferedFlushing implements PipeliningFlushState {
	private final AtomicLong commands = new AtomicLong();
	private final int flushAfter;

	public BufferedFlushing(int flushAfter) {
		this.flushAfter = flushAfter;
	}

	@Override
	public void onOpen(StatefulConnection<?, ?> connection) {
	    //首先配置連線的 AutoFlushCommands 為 false,這樣命令就不會立刻發到 Redis
		connection.setAutoFlushCommands(false);
	}

	@Override
	public void onCommand(StatefulConnection<?, ?> connection) {
	    //如果命令達到指定個數,就發到 Redis
		if (commands.incrementAndGet() % flushAfter == 0) {
			connection.flushCommands();
		}
	}

	@Override
	public void onClose(StatefulConnection<?, ?> connection) {
	    //在 pipeline 關閉的時候傳送所有命令
		connection.flushCommands();
		//恢復預設配置,這樣連線如果退回連線池不會影響後續使用
		connection.setAutoFlushCommands(true);
	}
}

微信搜尋“我的程式設計喵”關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer