1. 程式人生 > >Redis叢集批量操作

Redis叢集批量操作

      眾所周知,Redis叢集是沒法執行批量操作命令的,如mget,pipeline等。這是因為redis將叢集劃分為16383個雜湊槽,不同的key會劃分到不同的槽中。但是,Jedis客戶端提供了計算key的slot方法,已經slot和節點之間的對映關係,通過這兩個資料,就可以計算出每個key所在的節點,然後使用pipeline獲取資料。具體程式碼如下:

/**
 * 根據key計算slot,再根據slot計算node,獲取pipeline
 * 進行批量操作 
 */
public class BatchUtil {

	public static Map<String,String> mget(JedisCluster jc,String... keys){
		Map<String,String> resMap = new HashMap<>();
		if(keys == null || keys.length == 0){
			return resMap;
		}
		//如果只有一條,直接使用get即可
		if(keys.length == 1){
			resMap.put(keys[0],jc.get(keys[0]));
			return resMap;
		}
		
		//JedisCluster繼承了BinaryJedisCluster
		//BinaryJedisCluster的JedisClusterConnectionHandler屬性
		//裡面有JedisClusterInfoCache,根據這一條繼承鏈,可以獲取到JedisClusterInfoCache
		//從而獲取slot和JedisPool直接的對映
		MetaObject metaObject = SystemMetaObject.forObject(jc);
		JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
		
		//儲存地址+埠和命令的對映
		Map<JedisPool,List<String>> jedisPoolMap = new HashMap<>();
		
		List<String> keyList = null;
		JedisPool currentJedisPool = null;
		Pipeline currentPipeline = null;
		
		for(String key:keys){
			//計算雜湊槽
			int crc = JedisClusterCRC16.getSlot(key);
			//通過雜湊槽獲取節點的連線
			currentJedisPool = cache.getSlotPool(crc);
			
			//由於JedisPool作為value儲存在JedisClusterInfoCache中的一個map物件中,每個節點的
			//JedisPool在map的初始化階段就是確定的和唯一的,所以獲取到的每個節點的JedisPool都是一樣
			//的,可以作為map的key
			if(jedisPoolMap.containsKey(currentJedisPool)){
				jedisPoolMap.get(currentJedisPool).add(key);
			}else{
				keyList = new ArrayList<>();
				keyList.add(key);
				jedisPoolMap.put(currentJedisPool, keyList);
			}
		}
		
		//儲存結果
		List<Object> res = new ArrayList<>();
		
		//執行
		for(Entry<JedisPool,List<String>> entry:jedisPoolMap.entrySet()){
			try {
				currentJedisPool = entry.getKey();
				keyList = entry.getValue();
				//獲取pipeline
				currentPipeline = currentJedisPool.getResource().pipelined();
				for(String key:keyList){
					currentPipeline.get(key);
				}
				//從pipeline中獲取結果
				res = currentPipeline.syncAndReturnAll();
				currentPipeline.close();
				for(int i=0;i<keyList.size();i++){
					resMap.put(keyList.get(i),res.get(i)==null?null:res.get(i).toString());
				}
			} catch (Exception e) {
				e.printStackTrace();
				return new HashMap<>();
			}
			
		}
		return resMap;
	}
	
}