1. 程式人生 > >Jedis Cluster源碼分析

Jedis Cluster源碼分析

false ret pts on() finished oid private new redirect

最近一個項目用到Jedis客戶端,需要對這個客戶端進行改造。看了一下Jedis Cluster源碼,做個記錄

首先,說核心內容, 在Jedis源碼中,關於cluster有個兩個重要的map。一個是nodes,一個是slots

nodes: host:port ----> JedisPool

slots: slot ----> JedisPool

nodes存放的是key為host:port到JedisPool的映射

slots存放的 slot到JedisPool的映射

這裏,JedisPool是用apache common pool存放jedis對象的pool,slot是通過Crc16算出對16384取余得到

上個Jedis Cluster的Demo吧

 1 import redis.clients.jedis.HostAndPort;
 2 import redis.clients.jedis.JedisCluster;
 3 import java.util.HashSet;
 4 import java.util.Set;
 5 
 6 /**
 7  * Created by guanxianseng on 2017/8/15.
 8  *
 9  * nodes: host:port -> JedisPool
10  * slots: slot -> JedisPool
11  */
12
public class TestCluster { 13 public static void main(String[] args) { 14 Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); 15 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7340)); 16 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7341));
17 jedisClusterNodes.add(new HostAndPort("192.168.211.131", 7342)); 18 JedisCluster jc = new JedisCluster(jedisClusterNodes); 19 jc.set("name", "guanxianseng"); 20 System.out.println(jc.get("name")); 21 } 22 }

輸出

guanxianseng

Process finished with exit code 0

這裏IP是我的虛擬機的IP,開了兩臺虛擬機,部署的是三主三從的集群

首先,進入JedisCluster的構造函數,一路找下去,我們會看到這樣的代碼

1 public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
2                                        final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
3     this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
4     initializeSlotsCache(nodes, poolConfig, password);
5   }

進入initializeSlotsCache方法

 1 private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
 2     for (HostAndPort hostAndPort : startNodes) {
 3       Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
 4       if (password != null) {
 5         jedis.auth(password);
 6       }
 7       try {
 8         cache.discoverClusterNodesAndSlots(jedis);
 9         break;
10       } catch (JedisConnectionException e) {
11         // try next nodes
12       } finally {
13         if (jedis != null) {
14           jedis.close();
15         }
16       }
17     }
18   }

這裏,獲取集群節點的jedis對象,進入discoverClusterNodesAndSlots(jedis)

 1 public void discoverClusterNodesAndSlots(Jedis jedis) {
 2     w.lock();
 3 
 4     try {
 5       reset();
 6       List<Object> slots = jedis.clusterSlots();
 7 
 8       for (Object slotInfoObj : slots) {
 9         List<Object> slotInfo = (List<Object>) slotInfoObj;
10 
11         if (slotInfo.size() <= MASTER_NODE_INDEX) {
12           continue;
13         }
14 
15         List<Integer> slotNums = getAssignedSlotArray(slotInfo);
16 
17         // hostInfos
18         int size = slotInfo.size();
19         for (int i = MASTER_NODE_INDEX; i < size; i++) {
20           List<Object> hostInfos = (List<Object>) slotInfo.get(i);
21           if (hostInfos.size() <= 0) {
22             continue;
23           }
24 
25           HostAndPort targetNode = generateHostAndPort(hostInfos);
26           setupNodeIfNotExist(targetNode);
27           if (i == MASTER_NODE_INDEX) {
28             assignSlotsToNode(slotNums, targetNode);
29           }
30         }
31       }
32     } finally {
33       w.unlock();
34     }
35   }

第6行,其實就是執行slots命令。進入getAssignedSlotArray方法

private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }

這裏獲取了,節點分配的slots

回到上面,進入generateHostAndPort方法

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
    return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
        ((Long) hostInfos.get(1)).intValue());
  }

這裏獲取到節點的host和port

回到上面,進入setupNodeIfNotExist(targetNode);

 1 public JedisPool setupNodeIfNotExist(HostAndPort node) {
 2     w.lock();
 3     try {
 4       String nodeKey = getNodeKey(node);
 5       JedisPool existingPool = nodes.get(nodeKey);
 6       if (existingPool != null) return existingPool;
 7 
 8       JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
 9           connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
10       nodes.put(nodeKey, nodePool);
11       return nodePool;
12     } finally {
13       w.unlock();
14     }
15   }

這裏設置我們一開始提到的nodes, host:port -------> JedisPool映射

繼續回到上面,進入assignSlotsToNode(slotNums, targetNode);

 1 public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
 2     w.lock();
 3     try {
 4       JedisPool targetPool = setupNodeIfNotExist(targetNode);
 5       for (Integer slot : targetSlots) {
 6         slots.put(slot, targetPool);
 7       }
 8     } finally {
 9       w.unlock();
10     }
11   }

這裏設置了前面說的slots, slot ------> JedisPool的映射

這裏初始化完成

執行set命令

1 @Override
2   public String set(final String key, final String value) {
3     return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
4       @Override
5       public String execute(Jedis connection) {
6         return connection.set(key, value);
7       }
8     }.run(key);
9   }

進入run(key);方法

1 public T run(String key) {
2     if (key == null) {
3       throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
4     }
5 
6     return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
7   }

進入runWithRetries()

 1 private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
 2     if (attempts <= 0) {
 3       throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
 4     }
 5 
 6     Jedis connection = null;
 7     try {
 8 
 9       if (asking) {
10         // TODO: Pipeline asking with the original command to make it
11         // faster....
12         connection = askConnection.get();
13         connection.asking();
14 
15         // if asking success, reset asking flag
16         asking = false;
17       } else {
18         if (tryRandomNode) {
19           connection = connectionHandler.getConnection();
20         } else {
21           connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
22         }
23       }
24 
25       return execute(connection);
26 
27     } catch (JedisNoReachableClusterNodeException jnrcne) {

這裏有點長,截取了前面一部分

1 connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));

這裏,計算key的slot,從slots獲取Jedis對象

到這,基本已完成

總結一下,執行slots命令,緩存host:port --> JedisPool, slot ---->JedisPool映射。執行命令,key ---> slot ----> JedisPool ------->Jedis

 

Jedis Cluster源碼分析