Lettuce之RedisClusterClient使用以及源碼分析
Redis 集群的數據分片
redis集群並沒有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384個哈希槽,每個key通過CRC16校驗後對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽.也就是說如果key是不變的對應的slot也是不變的
可以通過cluster info 命名查看
cluster info cluster_state:ok cluster_slots_assigned:16384 cluster_slots_ok:16384 cluster_slots_pfail:0 cluster_slots_fail:0 cluster_known_nodes:12
通過cluster nodes命令查看當前節點以及該節點分配的slot,如下圖可以發現當前redis集群有12個節點,每個節點大約管理1365個slot
xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191
請求重定向
由於每個節點只負責部分slot,以及slot可能從一個節點遷移到另一節點,造成客戶端有可能會向錯誤的節點發起請求。因此需要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不同的重定向場景:
- MOVED
聲明的是slot所有權的轉移,收到的客戶端需要更新其key-node映射關系
- ASK
申明的是一種臨時的狀態,所有權還並沒有轉移,客戶端並不更新其映射關系。前面的加的ASKING命令也是申明其理解當前的這種臨時狀態
通過集群查詢數據key為test的值
xx.xxx.xxx.xx:6959> get test (error) MOVED 6918 xx.xxx.xx.xxx:6956
此時返回的結果表示該key在6956這個實例上,通過這個實例可以獲取到緩存值
xx.xxx.xx.xxx:6956> get test "cluster"
通過上文的示例可以發現獲取緩存值的過程需要訪問cluster兩次,既然key到slot值的算法是已知的,如果可以通過key直接計算slot,在通過每個節點的管理的slot範圍就可以知道這個key對應哪個節點了,這樣不就可以一次獲取到了嗎?其實lettuce中就是這樣處理的.
Lettuce使用
@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集群模式 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }
Lettuce相關源碼
在創建連接時就會主動發現集群圖譜信息
<K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) { //如果分區信息為null則初始化分區信息 if (partitions == null) { initializePartitions(); } //如果需要就激活拓撲刷新 activateTopologyRefreshIfNeeded();
protected void initializePartitions() { this.partitions = loadPartitions(); }
protected Partitions loadPartitions() { //獲取拓撲刷新信息, Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource(); String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource; try { //加載拓撲信息 Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());
public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) { //獲取超時時間,默認60秒 long commandTimeoutNs = getCommandTimeoutNs(seed); Connections connections = null; try { //獲取所有種子連接 connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS); Requests requestedTopology = connections.requestTopology(); Requests requestedClients = connections.requestClients(); //獲取節點拓撲視圖 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找額外節點 //獲取集群節點 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除種子節點,得到需要發現節點 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //如果需要發現節點不為空 if (!discoveredNodes.isEmpty()) { //需要發現節點連接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合並連接 connections = connections.mergeWith(discoveredConnections); //合並請求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //獲取節點視圖 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri對應分區信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }
這樣在創建connection的時候就已經知道集群中的所有有效節點.根據之前的文章可以知道對於集群命令的處理是在ClusterDistributionChannelWriter中處理的.其中有一些信息在初始化writer的時候就初始化了
class ClusterDistributionChannelWriter implements RedisChannelWriter { //默認寫入器 private final RedisChannelWriter defaultWriter; //集群事件監聽器 private final ClusterEventListener clusterEventListener; private final int executionLimit; //集群連接提供器 private ClusterConnectionProvider clusterConnectionProvider; //異步集群連接提供器 private AsyncClusterConnectionProvider asyncClusterConnectionProvider; //是否關閉 private boolean closed = false; //分區信息 private volatile Partitions partitions;
寫命令的處理如下,會根據key計算出slot,進而找到這個slot對應的node,直接訪問這個node,這樣可以有效減少訪問cluster次數
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null"); //如果連接已經關閉則拋出異常 if (closed) { throw new RedisException("Connection is closed"); } //如果是集群命令且命令沒有處理完畢 if (command instanceof ClusterCommand && !command.isDone()) { //類型轉換, 轉換為ClusterCommand ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command; if (clusterCommand.isMoved() || clusterCommand.isAsk()) { HostAndPort target; boolean asking; //如果集群命令已經遷移,此時通過ClusterCommand中到重試操作進行到此 if (clusterCommand.isMoved()) { //獲取命令遷移目標節點 target = getMoveTarget(clusterCommand.getError()); //觸發遷移事件 clusterEventListener.onMovedRedirection(); asking = false; } else {//如果是ask target = getAskTarget(clusterCommand.getError()); asking = true; clusterEventListener.onAskRedirection(); } command.getOutput().setError((String) null); //連接遷移後的目標節點 CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort()); //成功建立連接,則向該節點發送命令 if (isSuccessfullyCompleted(connectFuture)) { writeCommand(command, asking, connectFuture.join(), null); } else { connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable)); } return command; } } //不是集群命令就是RedisCommand,第一個請求命令就是非ClusterCommand //將當前命令包裝為集群命令 ClusterCommand<K, V, T> commandToSend = getCommandToSend(command); //獲取命令參數 CommandArgs<K, V> args = command.getArgs(); //排除集群路由的cluster命令 if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) { //獲取第一個編碼後的key ByteBuffer encodedKey = args.getFirstEncodedKey(); //如果encodedKey不為null if (encodedKey != null) { //獲取slot值 int hash = getSlot(encodedKey); //根據命令類型獲取命令意圖 是讀還是寫 ClusterConnectionProvider.Intent intent = getIntent(command.getType()); //根據意圖和slot獲取連接 CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider) .getConnectionAsync(intent, hash); //如果成功獲取連接 if (isSuccessfullyCompleted(connectFuture)) { writeCommand(commandToSend, false, connectFuture.join(), null); } else {//如果連接尚未處理完,或有異常,則添加完成處理器 connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection, throwable)); } return commandToSend; } } writeCommand(commandToSend, defaultWriter); return commandToSend; }
但是如果計算出的slot因為集群擴展導致這個slot已經不在這個節點上lettuce是如何處理的呢?通過查閱ClusterCommand源碼可以發現在complete方法中對於該問題進行了處理;如果響應是MOVED則會繼續訪問MOVED目標節點,這個重定向的此時可以指定的,默認為5次,通過上文的配置可以發現,在配置中只允許一次重定向
@Override public void complete() { //如果響應是MOVED或ASK if (isMoved() || isAsk()) { //如果最大重定向次數大於當前重定向次數則可以進行重定向 boolean retryCommand = maxRedirections > redirections; //重定向次數自增 redirections++; if (retryCommand) { try { //重定向 retry.write(this); } catch (Exception e) { completeExceptionally(e); } return; } } super.complete(); completed = true; }
如果是ask向重定向目標發送命令前需要同步發送asking
private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking, StatefulRedisConnection<K, V> connection, Throwable throwable) { if (throwable != null) { command.completeExceptionally(throwable); return; } try { //如果需要asking則發送asking if (asking) { connection.async().asking(); } //發送命令 writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter()); } catch (Exception e) { command.completeExceptionally(e); } }
Lettuce之RedisClusterClient使用以及源碼分析