Redis高階客戶端Lettuce詳解
前提
Lettuce
是一個Redis
的Java
驅動包,初識她的時候是使用RedisTemplate
的時候遇到點問題Debug
到底層的一些原始碼,發現spring-data-redis
的驅動包在某個版本之後替換為Lettuce
。Lettuce
翻譯為生菜,沒錯,就是吃的那種生菜,所以它的Logo
長這樣:
既然能被Spring
生態所認可,Lettuce
想必有過人之處,於是筆者花時間閱讀她的官方文件,整理測試示例,寫下這篇文章。編寫本文時所使用的版本為Lettuce 5.1.8.RELEASE
,SpringBoot 2.1.8.RELEASE
,JDK [8,11]
。超長警告:這篇文章斷斷續續花了兩週完成,超過4萬字.....
Lettuce簡介
Lettuce
是一個高效能基於Java
編寫的Redis
驅動框架,底層集成了Project Reactor
提供天然的反應式程式設計,通訊框架集成了Netty
使用了非阻塞IO
,5.x
版本之後融合了JDK1.8
的非同步程式設計特性,在保證高效能的同時提供了十分豐富易用的API
,5.1
版本的新特性如下:
- 支援
Redis
的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
。 - 支援通過
Brave
模組跟蹤Redis
命令執行。 - 支援
Redis Streams
。 - 支援非同步的主從連線。
- 支援非同步連線池。
- 新增命令最多執行一次模式(禁止自動重連)。
- 全域性命令超時設定(對非同步和反應式命令也有效)。
- ......等等
注意一點:Redis
的版本至少需要2.6
,當然越高越好,API
的相容性比較強大。
只需要引入單個依賴就可以開始愉快地使用Lettuce
:
- Maven
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
- Gradle
dependencies { compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' }
連線Redis
單機、哨兵、叢集模式下連線Redis
需要一個統一的標準去表示連線的細節資訊,在Lettuce
中這個統一的標準是RedisURI
。可以通過三種方式構造一個RedisURI
例項:
- 定製的字串
URI
語法:
RedisURI uri = RedisURI.create("redis://localhost/");
- 使用建造器(
RedisURI.Builder
):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
- 直接通過建構函式例項化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);
定製的連線URI語法
- 單機(字首為
redis://
)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://[email protected]:6379/0?timeout=10s
簡單:redis://localhost
- 單機並且使用
SSL
(字首為rediss://
) <== 注意後面多了個s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://[email protected]:6379/0?timeout=10s
簡單:rediss://localhost
- 單機
Unix Domain Sockets
模式(字首為redis-socket://
)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
- 哨兵(字首為
redis-sentinel://
)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://[email protected]:6379,127.0.0.1:6380/0?timeout=10s#mymaster
超時時間單位:
- d 天
- h 小時
- m 分鐘
- s 秒鐘
- ms 毫秒
- us 微秒
- ns 納秒
個人建議使用RedisURI
提供的建造器,畢竟定製的URI
雖然簡潔,但是比較容易出現人為錯誤。鑑於筆者沒有SSL
和Unix Domain Socket
的使用場景,下面不對這兩種連線方式進行列舉。
基本使用
Lettuce
使用的時候依賴於四個主要元件:
RedisURI
:連線資訊。RedisClient
:Redis
客戶端,特殊地,叢集連線有一個定製的RedisClusterClient
。Connection
:Redis
連線,主要是StatefulConnection
或者StatefulRedisConnection
的子類,連線的型別主要由連線的具體方式(單機、哨兵、叢集、訂閱釋出等等)選定,比較重要。RedisCommands
:Redis
命令API
介面,基本上覆蓋了Redis
發行版本的所有命令,提供了同步(sync
)、非同步(async
)、反應式(reative
)的呼叫方式,對於使用者而言,會經常跟RedisCommands
系列介面打交道。
一個基本使用例子如下:
@Test
public void testSetGet() throws Exception {
RedisURI redisUri = RedisURI.builder() // <1> 建立單機連線的連線資訊
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri); // <2> 建立客戶端
StatefulRedisConnection<String, String> connection = redisClient.connect(); // <3> 建立執行緒安全的連線
RedisCommands<String, String> redisCommands = connection.sync(); // <4> 建立同步命令
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
String result = redisCommands.set("name", "throwable", setArgs);
Assertions.assertThat(result).isEqualToIgnoringCase("OK");
result = redisCommands.get("name");
Assertions.assertThat(result).isEqualTo("throwable");
// ... 其他操作
connection.close(); // <5> 關閉連線
redisClient.shutdown(); // <6> 關閉客戶端
}
注意:
- <5>:關閉連線一般在應用程式停止之前操作,一個應用程式中的一個
Redis
驅動例項不需要太多的連線(一般情況下只需要一個連線例項就可以,如果有多個連線的需要可以考慮使用連線池,其實Redis
目前處理命令的模組是單執行緒,在客戶端多個連線多執行緒呼叫理論上沒有效果)。 - <6>:關閉客戶端一般應用程式停止之前操作,如果條件允許的話,基於後開先閉原則,客戶端關閉應該在連線關閉之後操作。
API
Lettuce
主要提供三種API
:
- 同步(
sync
):RedisCommands
。 - 非同步(
async
):RedisAsyncCommands
。 - 反應式(
reactive
):RedisReactiveCommands
。
先準備好一個單機Redis
連線備用:
private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;
@BeforeClass
public static void beforeClass() {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
CLIENT = RedisClient.create(redisUri);
CONNECTION = CLIENT.connect();
}
@AfterClass
public static void afterClass() throws Exception {
CONNECTION.close();
CLIENT.shutdown();
}
Redis
命令API
的具體實現可以直接從StatefulRedisConnection
例項獲取,見其介面定義:
public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {
boolean isMulti();
RedisCommands<K, V> sync();
RedisAsyncCommands<K, V> async();
RedisReactiveCommands<K, V> reactive();
}
值得注意的是,在不指定編碼解碼器RedisCodec
的前提下,RedisClient
建立的StatefulRedisConnection
例項一般是泛型例項StatefulRedisConnection<String,String>
,也就是所有命令API
的KEY
和VALUE
都是String
型別,這種使用方式能滿足大部分的使用場景。當然,必要的時候可以定製編碼解碼器RedisCodec<K,V>
。
同步API
先構建RedisCommands
例項:
private static RedisCommands<String, String> COMMAND;
@BeforeClass
public static void beforeClass() {
COMMAND = CONNECTION.sync();
}
基本使用:
@Test
public void testSyncPing() throws Exception {
String pong = COMMAND.ping();
Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}
@Test
public void testSyncSetAndGet() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
COMMAND.set("name", "throwable", setArgs);
String value = COMMAND.get("name");
log.info("Get value: {}", value);
}
// Get value: throwable
同步API
在所有命令呼叫之後會立即返回結果。如果熟悉Jedis
的話,RedisCommands
的用法其實和它相差不大。
非同步API
先構建RedisAsyncCommands
例項:
private static RedisAsyncCommands<String, String> ASYNC_COMMAND;
@BeforeClass
public static void beforeClass() {
ASYNC_COMMAND = CONNECTION.async();
}
基本使用:
@Test
public void testAsyncPing() throws Exception {
RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG
RedisAsyncCommands
所有方法執行返回結果都是RedisFuture
例項,而RedisFuture
介面的定義如下:
public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {
String getError();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}
也就是,RedisFuture
可以無縫使用Future
或者JDK
1.8中引入的CompletableFuture
提供的方法。舉個例子:
@Test
public void testAsyncSetAndGet1() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
// CompletableFuture#thenAccept()
future.thenAccept(value -> log.info("Set命令返回:{}", value));
// Future#get()
future.get();
}
// Set命令返回:OK
@Test
public void testAsyncSetAndGet2() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
CompletableFuture<Void> result =
(CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
.thenAcceptBoth(ASYNC_COMMAND.get("name"),
(s, g) -> {
log.info("Set命令返回:{}", s);
log.info("Get命令返回:{}", g);
});
result.get();
}
// Set命令返回:OK
// Get命令返回:throwable
如果能熟練使用CompletableFuture
和函數語言程式設計技巧,可以組合多個RedisFuture
完成一些列複雜的操作。
反應式API
Lettuce
引入的反應式程式設計框架是Project Reactor,如果沒有反應式程式設計經驗可以先自行了解一下Project Reactor
。
構建RedisReactiveCommands
例項:
private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;
@BeforeClass
public static void beforeClass() {
REACTIVE_COMMAND = CONNECTION.reactive();
}
根據Project Reactor
,RedisReactiveCommands
的方法如果返回的結果只包含0或1個元素,那麼返回值型別是Mono
,如果返回的結果包含0到N(N大於0)個元素,那麼返回值是Flux
。舉個例子:
@Test
public void testReactivePing() throws Exception {
Mono<String> ping = REACTIVE_COMMAND.ping();
ping.subscribe(v -> log.info("Ping result:{}", v));
Thread.sleep(1000);
}
// Ping result:PONG
@Test
public void testReactiveSetAndGet() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
Thread.sleep(1000);
}
// Get命令返回:throwable
@Test
public void testReactiveSet() throws Exception {
REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
Flux<String> flux = REACTIVE_COMMAND.smembers("food");
flux.subscribe(log::info);
REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
Thread.sleep(1000);
}
// meat
// bread
// fish
舉個更加複雜的例子,包含了事務、函式轉換等:
@Test
public void testReactiveFunctional() throws Exception {
REACTIVE_COMMAND.multi().doOnSuccess(r -> {
REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
}).flatMap(s -> REACTIVE_COMMAND.exec())
.doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
.subscribe();
Thread.sleep(1000);
}
// OK
// 2
// Discarded:false
這個方法開啟一個事務,先把counter
設定為1,再將counter
自增1。
釋出和訂閱
非叢集模式下的釋出訂閱依賴於定製的連線StatefulRedisPubSubConnection
,叢集模式下的釋出訂閱依賴於定製的連線StatefulRedisClusterPubSubConnection
,兩者分別來源於RedisClient#connectPubSub()
系列方法和RedisClusterClient#connectPubSub()
:
- 非叢集模式:
// 可能是單機、普通主從、哨兵等非叢集模式的客戶端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// 非同步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");
// 反應式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();
reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
- 叢集模式:
// 使用方式其實和非叢集模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...
這裡用單機同步命令的模式舉一個Redis
鍵空間通知(Redis Keyspace Notifications)的例子:
@Test
public void testSyncKeyspaceNotification() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
// 注意這裡只能是0號庫
.withDatabase(0)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
RedisCommands<String, String> redisCommands = redisConnection.sync();
// 只接收鍵過期的事件
redisCommands.configSet("notify-keyspace-events", "Ex");
StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
connection.addListener(new RedisPubSubAdapter<>() {
@Override
public void psubscribed(String pattern, long count) {
log.info("pattern:{},count:{}", pattern, count);
}
@Override
public void message(String pattern, String channel, String message) {
log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
}
});
RedisPubSubCommands<String, String> commands = connection.sync();
commands.psubscribe("__keyevent@0__:expired");
redisCommands.setex("name", 2, "throwable");
Thread.sleep(10000);
redisConnection.close();
connection.close();
redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name
實際上,在實現RedisPubSubListener
的時候可以單獨抽離,儘量不要設計成匿名內部類的形式。
事務和批量命令執行
事務相關的命令就是WATCH
、UNWATCH
、EXEC
、MULTI
和DISCARD
,在RedisCommands
系列介面中有對應的方法。舉個例子:
// 同步模式
@Test
public void testSyncMulti() throws Exception {
COMMAND.multi();
COMMAND.setex("name-1", 2, "throwable");
COMMAND.setex("name-2", 2, "doge");
TransactionResult result = COMMAND.exec();
int index = 0;
for (Object r : result) {
log.info("Result-{}:{}", index, r);
index++;
}
}
// Result-0:OK
// Result-1:OK
Redis
的Pipeline
也就是管道機制可以理解為把多個命令打包在一次請求傳送到Redis
服務端,然後Redis
服務端把所有的響應結果打包好一次性返回,從而節省不必要的網路資源(最主要是減少網路請求次數)。Redis
對於Pipeline
機制如何實現並沒有明確的規定,也沒有提供特殊的命令支援Pipeline
機制。Jedis
中底層採用BIO
(阻塞IO)通訊,所以它的做法是客戶端快取將要傳送的命令,最後需要觸發然後同步傳送一個巨大的命令列表包,再接收和解析一個巨大的響應列表包。Pipeline
在Lettuce
中對使用者是透明的,由於底層的通訊框架是Netty
,所以網路通訊層面的優化Lettuce
不需要過多幹預,換言之可以這樣理解:Netty
幫Lettuce
從底層實現了Redis
的Pipeline
機制。但是,Lettuce
的非同步API
也提供了手動Flush
的方法:
@Test
public void testAsyncManualFlush() {
// 取消自動flush
ASYNC_COMMAND.setAutoFlushCommands(false);
List<RedisFuture<?>> redisFutures = Lists.newArrayList();
int count = 5000;
for (int i = 0; i < count; i++) {
String key = "key-" + (i + 1);
String value = "value-" + (i + 1);
redisFutures.add(ASYNC_COMMAND.set(key, value));
redisFutures.add(ASYNC_COMMAND.expire(key, 2));
}
long start = System.currentTimeMillis();
ASYNC_COMMAND.flushCommands();
boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
Assertions.assertThat(result).isTrue();
log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms
上面只是從文件看到的一些理論術語,但是現實是骨感的,對比了下Jedis
的Pipeline
提供的方法,發現了Jedis
的Pipeline
執行耗時比較低:
@Test
public void testJedisPipeline() throws Exception {
Jedis jedis = new Jedis();
Pipeline pipeline = jedis.pipelined();
int count = 5000;
for (int i = 0; i < count; i++) {
String key = "key-" + (i + 1);
String value = "value-" + (i + 1);
pipeline.set(key, value);
pipeline.expire(key, 2);
}
long start = System.currentTimeMillis();
pipeline.syncAndReturnAll();
log.info("Jedis cost:{} ms", System.currentTimeMillis() - start);
}
// Jedis cost:9 ms
個人猜測Lettuce
可能底層並非合併所有命令一次傳送(甚至可能是單條傳送),具體可能需要抓包才能定位。依此來看,如果真的有大量執行Redis
命令的場景,不妨可以使用Jedis
的Pipeline
。
注意:由上面的測試推斷RedisTemplate
的executePipelined()
方法是假的Pipeline
執行方法,使用RedisTemplate
的時候請務必注意這一點。
Lua指令碼執行
Lettuce
中執行Redis
的Lua
命令的同步介面如下:
public interface RedisScriptingCommands<K, V> {
<T> T eval(String var1, ScriptOutputType var2, K... var3);
<T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);
<T> T evalsha(String var1, ScriptOutputType var2, K... var3);
<T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);
List<Boolean> scriptExists(String... var1);
String scriptFlush();
String scriptKill();
String scriptLoad(V var1);
String digest(V var1);
}
非同步和反應式的介面方法定義差不多,不同的地方就是返回值型別,一般我們常用的是eval()
、evalsha()
和scriptLoad()
方法。舉個簡單的例子:
private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
"local value = ARGV[1]\n" +
"local timeout = ARGV[2]\n" +
"redis.call('SETEX', key, tonumber(timeout), value)\n" +
"local result = redis.call('GET', key)\n" +
"return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();
@Test
public void testLua() throws Exception {
LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
String[] keys = new String[]{"name"};
String[] args = new String[]{"throwable", "5000"};
String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
log.info("Get value:{}", result);
}
// Get value:throwable
高可用和分片
為了Redis
的高可用,一般會採用普通主從(Master/Replica
,這裡筆者稱為普通主從模式,也就是僅僅做了主從複製,故障需要手動切換)、哨兵和叢集。普通主從模式可以獨立執行,也可以配合哨兵執行,只是哨兵提供自動故障轉移和主節點提升功能。普通主從和哨兵都可以使用MasterSlave
,通過入參包括RedisClient
、編碼解碼器以及一個或者多個RedisURI
獲取對應的Connection
例項。
這裡注意一點,MasterSlave
中提供的方法如果只要求傳入一個RedisURI
例項,那麼Lettuce
會進行拓撲發現機制,自動獲取Redis
主從節點資訊;如果要求傳入一個RedisURI
集合,那麼對於普通主從模式來說所有節點資訊是靜態的,不會進行發現和更新。
拓撲發現的規則如下:
- 對於普通主從(
Master/Replica
)模式,不需要感知RedisURI
指向從節點還是主節點,只會進行一次性的拓撲查詢所有節點資訊,此後節點資訊會儲存在靜態快取中,不會更新。 - 對於哨兵模式,會訂閱所有哨兵例項並偵聽訂閱/釋出訊息以觸發拓撲重新整理機制,更新快取的節點資訊,也就是哨兵天然就是動態發現節點資訊,不支援靜態配置。
拓撲發現機制的提供API
為TopologyProvider
,需要了解其原理的可以參考具體的實現。
對於叢集(Cluster
)模式,Lettuce
提供了一套獨立的API
。
另外,如果Lettuce
連接面向的是非單個Redis
節點,連線例項提供了資料讀取節點偏好(ReadFrom
)設定,可選值有:
MASTER
:只從Master
節點中讀取。MASTER_PREFERRED
:優先從Master
節點中讀取。SLAVE_PREFERRED
:優先從Slavor
節點中讀取。SLAVE
:只從Slavor
節點中讀取。NEAREST
:使用最近一次連線的Redis
例項讀取。
普通主從模式
假設現在有三個Redis
服務形成樹狀主從關係如下:
- 節點一:localhost:6379,角色為Master。
- 節點二:localhost:6380,角色為Slavor,節點一的從節點。
- 節點三:localhost:6381,角色為Slavor,節點二的從節點。
首次動態節點發現主從模式的節點資訊需要如下構建連線:
@Test
public void testDynamicReplica() throws Exception {
// 這裡只需要配置一個節點的連線資訊,不一定需要是主節點的資訊,從節點也可以
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisClient redisClient = RedisClient.create(uri);
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
// 只從從節點讀取資料
connection.setReadFrom(ReadFrom.SLAVE);
// 執行其他Redis命令
connection.close();
redisClient.shutdown();
}
如果需要指定靜態的Redis
主從節點連線屬性,那麼可以這樣構建連線:
@Test
public void testStaticReplica() throws Exception {
List<RedisURI> uris = new ArrayList<>();
RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
uris.add(uri1);
uris.add(uri2);
uris.add(uri3);
RedisClient redisClient = RedisClient.create();
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
new Utf8StringCodec(), uris);
// 只從主節點讀取資料
connection.setReadFrom(ReadFrom.MASTER);
// 執行其他Redis命令
connection.close();
redisClient.shutdown();
}
哨兵模式
由於Lettuce
自身提供了哨兵的拓撲發現機制,所以只需要隨便配置一個哨兵節點的RedisURI
例項即可:
@Test
public void testDynamicSentinel() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withPassword("你的密碼")
.withSentinel("localhost", 26379)
.withSentinelMasterId("哨兵Master的ID")
.build();
RedisClient redisClient = RedisClient.create();
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
// 只允許從從節點讀取資料
connection.setReadFrom(ReadFrom.SLAVE);
RedisCommands<String, String> command = connection.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
command.set("name", "throwable", setArgs);
String value = command.get("name");
log.info("Get value:{}", value);
}
// Get value:throwable
叢集模式
鑑於筆者對Redis
叢集模式並不熟悉,Cluster
模式下的API
使用本身就有比較多的限制,所以這裡只簡單介紹一下怎麼用。先說幾個特性:
下面的API提供跨槽位(Slot
)呼叫的功能:
RedisAdvancedClusterCommands
。RedisAdvancedClusterAsyncCommands
。RedisAdvancedClusterReactiveCommands
。
靜態節點選擇功能:
masters
:選擇所有主節點執行命令。slaves
:選擇所有從節點執行命令,其實就是隻讀模式。all nodes
:命令可以在所有節點執行。
叢集拓撲檢視動態更新功能:
- 手動更新,主動呼叫
RedisClusterClient#reloadPartitions()
。 - 後臺定時更新。
- 自適應更新,基於連線斷開和
MOVED/ASK
命令重定向自動更新。
Redis
叢集搭建詳細過程可以參考官方文件,假設已經搭建好叢集如下(192.168.56.200
是筆者的虛擬機器Host):
- 192.168.56.200:7001 => 主節點,槽位0-5460。
- 192.168.56.200:7002 => 主節點,槽位5461-10922。
- 192.168.56.200:7003 => 主節點,槽位10923-16383。
- 192.168.56.200:7004 => 7001的從節點。
- 192.168.56.200:7005 => 7002的從節點。
- 192.168.56.200:7006 => 7003的從節點。
簡單的叢集連線和使用方式如下:
@Test
public void testSyncCluster(){
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name",10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
}
// Get value:throwable
節點選擇:
@Test
public void testSyncNodeSelection() {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
// commands.all(); // 所有節點
// commands.masters(); // 主節點
// 從節點只讀
NodeSelection<String, String> replicas = commands.slaves();
NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
// 這裡只是演示,一般應該禁用keys *命令
Executions<List<String>> keys = nodeSelectionCommands.keys("*");
keys.forEach(key -> log.info("key: {}", key));
connection.close();
redisClusterClient.shutdown();
}
定時更新叢集拓撲檢視(每隔十分鐘更新一次,這個時間自行考量,不能太頻繁):
@Test
public void testPeriodicClusterTopology() throws Exception {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
.builder()
.enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
.build();
redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name", 10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
Thread.sleep(Integer.MAX_VALUE);
connection.close();
redisClusterClient.shutdown();
}
自適應更新叢集拓撲檢視:
@Test
public void testAdaptiveClusterTopology() throws Exception {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
.enableAdaptiveRefreshTrigger(
ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
)
.adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
.build();
redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name", 10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
Thread.sleep(Integer.MAX_VALUE);
connection.close();
redisClusterClient.shutdown();
}
動態命令和自定義命令
自定義命令是Redis
命令有限集,不過可以更細粒度指定KEY
、ARGV
、命令型別、編碼解碼器和返回值型別,依賴於dispatch()
方法:
// 自定義實現PING方法
@Test
public void testCustomPing() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
RedisCodec<String, String> codec = StringCodec.UTF8;
String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
log.info("PING:{}", result);
connect.close();
redisClient.shutdown();
}
// PING:PONG
// 自定義實現Set方法
@Test
public void testCustomSet() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
RedisCodec<String, String> codec = StringCodec.UTF8;
sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
String result = sync.get("name");
log.info("Get value:{}", result);
connect.close();
redisClient.shutdown();
}
// Get value:throwable
動態命令是基於Redis
命令有限集,並且通過註解和動態代理完成一些複雜命令組合的實現。主要註解在io.lettuce.core.dynamic.annotation
包路徑下。簡單舉個例子:
public interface CustomCommand extends Commands {
// SET [key] [value]
@Command("SET ?0 ?1")
String setKey(String key, String value);
// SET [key] [value]
@Command("SET :key :value")
String setKeyNamed(@Param("key") String key, @Param("value") String value);
// MGET [key1] [key2]
@Command("MGET ?0 ?1")
List<String> mGet(String key1, String key2);
/**
* 方法名作為命令
*/
@CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
String mSet(String key1, String value1, String key2, String value2);
}
@Test
public void testCustomDynamicSet() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
commands.setKey("name", "throwable");
commands.setKeyNamed("throwable", "doge");
log.info("MGET ===> " + commands.mGet("name", "throwable"));
commands.mSet("key1", "value1","key2", "value2");
log.info("MGET ===> " + commands.mGet("key1", "key2"));
connect.close();
redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]
高階特性
Lettuce
有很多高階使用特性,這裡只列舉個人認為常用的兩點:
- 配置客戶端資源。
- 使用連線池。
更多其他特性可以自行參看官方文件。
配置客戶端資源
客戶端資源的設定與Lettuce
的效能、併發和事件處理相關。執行緒池或者執行緒組相關配置佔據客戶端資源配置的大部分(EventLoopGroups
和EventExecutorGroup
),這些執行緒池或者執行緒組是連線程式的基礎元件。一般情況下,客戶端資源應該在多個Redis
客戶端之間共享,並且在不再使用的時候需要自行關閉。筆者認為,客戶端資源是面向Netty
的。注意:除非特別熟悉或者花長時間去測試調整下面提到的引數,否則在沒有經驗的前提下憑直覺修改預設值,有可能會踩坑。
客戶端資源介面是ClientResources
,實現類是DefaultClientResources
。
構建DefaultClientResources
例項:
// 預設
ClientResources resources = DefaultClientResources.create();
// 建造器
ClientResources resources = DefaultClientResources.builder()
.ioThreadPoolSize(4)
.computationThreadPoolSize(4)
.build()
使用:
ClientResources resources = DefaultClientResources.create();
// 非叢集
RedisClient client = RedisClient.create(resources, uri);
// 叢集
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 關閉資源
resources.shutdown();
客戶端資源基本配置:
屬性 | 描述 | 預設值 |
---|---|---|
ioThreadPoolSize |
I/O 執行緒數 |
Runtime.getRuntime().availableProcessors() |
computationThreadPoolSize |
任務執行緒數 | Runtime.getRuntime().availableProcessors() |
客戶端資源高階配置:
屬性 | 描述 | 預設值 |
---|---|---|
eventLoopGroupProvider |
EventLoopGroup 提供商 |
- |
eventExecutorGroupProvider |
EventExecutorGroup 提供商 |
- |
eventBus |
事件匯流排 | DefaultEventBus |
commandLatencyCollectorOptions |
命令延時收集器配置 | DefaultCommandLatencyCollectorOptions |
commandLatencyCollector |
命令延時收集器 | DefaultCommandLatencyCollector |
commandLatencyPublisherOptions |
命令延時釋出器配置 | DefaultEventPublisherOptions |
dnsResolver |
DNS 處理器 |
JDK或者Netty 提供 |
reconnectDelay |
重連延時配置 | Delay.exponential() |
nettyCustomizer |
Netty 自定義配置器 |
- |
tracing |
軌跡記錄器 | - |
非叢集客戶端RedisClient
的屬性配置:
Redis
非叢集客戶端RedisClient
本身提供了配置屬性方法:
RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
.autoReconnect(false)
.pingBeforeActivateConnection(true)
.build());
非叢集客戶端的配置屬性列表:
屬性 | 描述 | 預設值 |
---|---|---|
pingBeforeActivateConnection |
連線啟用之前是否執行PING 命令 |
false |
autoReconnect |
是否自動重連 | true |
cancelCommandsOnReconnectFailure |
重連失敗是否拒絕命令執行 | false |
suspendReconnectOnProtocolFailure |
底層協議失敗是否掛起重連操作 | false |
requestQueueSize |
請求佇列容量 | 2147483647(Integer#MAX_VALUE) |
disconnectedBehavior |
失去連線時候的行為 | DEFAULT |
sslOptions |
SSL配置 |
- |
socketOptions |
Socket 配置 |
10 seconds Connection-Timeout, no keep-alive, no TCP noDelay |
timeoutOptions |
超時配置 | - |
publishOnScheduler |
釋出反應式訊號資料的排程器 | 使用I/O 執行緒 |
叢集客戶端屬性配置:
Redis
叢集客戶端RedisClusterClient
本身提供了配置屬性方法:
RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
.enableAllAdaptiveRefreshTriggers()
.build();
client.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build());
叢集客戶端的配置屬性列表:
屬性 | 描述 | 預設值 |
---|---|---|
enablePeriodicRefresh |
是否允許週期性更新叢集拓撲檢視 | false |
refreshPeriod |
更新叢集拓撲檢視週期 | 60秒 |
enableAdaptiveRefreshTrigger |
設定自適應更新叢集拓撲檢視觸發器RefreshTrigger |
- |
adaptiveRefreshTriggersTimeout |
自適應更新叢集拓撲檢視觸發器超時設定 | 30秒 |
refreshTriggersReconnectAttempts |
自適應更新叢集拓撲檢視觸發重連次數 | 5 |
dynamicRefreshSources |
是否允許動態重新整理拓撲資源 | true |
closeStaleConnections |
是否允許關閉陳舊的連線 | true |
maxRedirects |
叢集重定向次數上限 | 5 |
validateClusterNodeMembership |
是否校驗叢集節點的成員關係 | true |
使用連線池
引入連線池依賴commons-pool2
:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.7.0</version>
</dependency
基本使用如下:
@Test
public void testUseConnectionPool() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
GenericObjectPool<StatefulRedisConnection<String, String>> pool
= ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
RedisCommands<String, String> command = connection.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
command.set("name", "throwable", setArgs);
String n = command.get("name");
log.info("Get value:{}", n);
}
pool.close();
redisClient.shutdown();
}
其中,同步連線的池化支援需要用ConnectionPoolSupport
,非同步連線的池化支援需要用AsyncConnectionPoolSupport
(Lettuce
5.1之後才支援)。
幾個常見的漸進式刪除例子
漸進式刪除Hash中的域-屬性:
@Test
public void testDelBigHashKey() throws Exception {
// SCAN引數
ScanArgs scanArgs = ScanArgs.Builder.limit(2);
// TEMP遊標
ScanCursor cursor = ScanCursor.INITIAL;
// 目標KEY
String key = "BIG_HASH_KEY";
prepareHashTestData(key);
log.info("開始漸進式刪除Hash的元素...");
int counter = 0;
do {
MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
// 重置TEMP遊標
cursor = ScanCursor.of(result.getCursor());
cursor.setFinished(result.isFinished());
Collection<String> fields = result.getMap().values();
if (!fields.isEmpty()) {
COMMAND.hdel(key, fields.toArray(new String[0]));
}
counter++;
} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
log.info("漸進式刪除Hash的元素完畢,迭代次數:{} ...", counter);
}
private void prepareHashTestData(String key) throws Exception {
COMMAND.hset(key, "1", "1");
COMMAND.hset(key, "2", "2");
COMMAND.hset(key, "3", "3");
COMMAND.hset(key, "4", "4");
COMMAND.hset(key, "5", "5");
}
漸進式刪除集合中的元素:
@Test
public void testDelBigSetKey() throws Exception {
String key = "BIG_SET_KEY";
prepareSetTestData(key);
// SCAN引數
ScanArgs scanArgs = ScanArgs.Builder.limit(2);
// TEMP遊標
ScanCursor cursor = ScanCursor.INITIAL;
log.info("開始漸進式刪除Set的元素...");
int counter = 0;
do {
ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
// 重置TEMP遊標
cursor = ScanCursor.of(result.getCursor());
cursor.setFinished(result.isFinished());
List<String> values = result.getValues();
if (!values.isEmpty()) {
COMMAND.srem(key, values.toArray(new String[0]));
}
counter++;
} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
log.info("漸進式刪除Set的元素完畢,迭代次數:{} ...", counter);
}
private void prepareSetTestData(String key) throws Exception {
COMMAND.sadd(key, "1", "2", "3", "4", "5");
}
漸進式刪除有序集合中的元素:
@Test
public void testDelBigZSetKey() throws Exception {
// SCAN引數
ScanArgs scanArgs = ScanArgs.Builder.limit(2);
// TEMP遊標
ScanCursor cursor = ScanCursor.INITIAL;
// 目標KEY
String key = "BIG_ZSET_KEY";
prepareZSetTestData(key);
log.info("開始漸進式刪除ZSet的元素...");
int counter = 0;
do {
ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
// 重置TEMP遊標
cursor = ScanCursor.of(result.getCursor());
cursor.setFinished(result.isFinished());
List<ScoredValue<String>> scoredValues = result.getValues();
if (!scoredValues.isEmpty()) {
COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
}
counter++;
} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
log.info("漸進式刪除ZSet的元素完畢,迭代次數:{} ...", counter);
}
private void prepareZSetTestData(String key) throws Exception {
COMMAND.zadd(key, 0, "1");
COMMAND.zadd(key, 0, "2");
COMMAND.zadd(key, 0, "3");
COMMAND.zadd(key, 0, "4");
COMMAND.zadd(key, 0, "5");
}
在SpringBoot中使用Lettuce
個人認為,spring-data-redis
中的API
封裝並不是很優秀,用起來比較重,不夠靈活,這裡結合前面的例子和程式碼,在SpringBoot
腳手架專案中配置和整合Lettuce
。先引入依賴:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.8.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
一般情況下,每個應用應該使用單個Redis
客戶端例項和單個連線例項,這裡設計一個腳手架,適配單機、普通主從、哨兵和叢集四種使用場景。對於客戶端資源,採用預設的實現即可。對於Redis
的連線屬性,比較主要的有Host
、Port
和Password
,其他可以暫時忽略。基於約定大於配置的原則,先定製一系列屬性配置類(其實有些配置是可以完全共用,但是考慮到要清晰描述類之間的關係,這裡拆分多個配置屬性類和多個配置方法):
@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {
private LettuceSingleProperties single;
private LettuceReplicaProperties replica;
private LettuceSentinelProperties sentinel;
private LettuceClusterProperties cluster;
}
@Data
public class LettuceSingleProperties {
private String host;
private Integer port;
private String password;
}
@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {
}
@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {
private String masterId;
}
@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {
}
配置類如下,主要使用@ConditionalOnProperty
做隔離,一般情況下,很少有人會在一個應用使用一種以上的Redis
連線場景:
@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {
private final LettuceProperties lettuceProperties;
@Bean(destroyMethod = "shutdown")
public ClientResources clientResources() {
return DefaultClientResources.create();
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "lettuce.single.host")
public RedisClient singleRedisClient(ClientResources clientResources) {
LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
RedisURI uri = RedisURI.builder()
.withHost(singleProperties.getHost())
.withPort(singleProperties.getPort())
.withPassword(singleProperties.getPassword())
.build();
return RedisClient.create(clientResources, uri);
}
@Bean(destroyMethod = "close")
@ConditionalOnProperty(name = "lettuce.single.host")
public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
return singleRedisClient.connect();
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "lettuce.replica.host")
public RedisClient replicaRedisClient(ClientResources clientResources) {
LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
RedisURI uri = RedisURI.builder()
.withHost(replicaProperties.getHost())
.withPort(replicaProperties.getPort())
.withPassword(replicaProperties.getPassword())
.build();
return RedisClient.create(clientResources, uri);
}
@Bean(destroyMethod = "close")
@ConditionalOnProperty(name = "lettuce.replica.host")
public StatefulRedisConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient) {
return replicaRedisClient.connect();
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "lettuce.sentinel.host")
public RedisClient sentinelRedisClient(ClientResources clientResources) {
LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
RedisURI uri = RedisURI.builder()
.withHost(sentinelProperties.getHost())
.withPort(sentinelProperties.getPort())
.withPassword(sentinelProperties.getPassword())
.withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
.withSentinelMasterId(sentinelProperties.getMasterId())
.build();
return RedisClient.create(clientResources, uri);
}
@Bean(destroyMethod = "close")
@ConditionalOnProperty(name = "lettuce.sentinel.host")
public StatefulRedisConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient) {
return sentinelRedisClient.connect();
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "lettuce.cluster.host")
public RedisClusterClient redisClusterClient(ClientResources clientResources) {
LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
RedisURI uri = RedisURI.builder()
.withHost(clusterProperties.getHost())
.withPort(clusterProperties.getPort())
.withPassword(clusterProperties.getPassword())
.build();
return RedisClusterClient.create(clientResources, uri);
}
@Bean(destroyMethod = "close")
@ConditionalOnProperty(name = "lettuce.cluster")
public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
return clusterClient.connect();
}
}
最後為了讓IDE
識別我們的配置,可以新增IDE
親緣性,/META-INF
資料夾下新增一個檔案spring-configuration-metadata.json
,內容如下:
{
"properties": [
{
"name": "lettuce.single",
"type": "club.throwable.spring.lettuce.LettuceSingleProperties",
"description": "單機配置",
"sourceType": "club.