redis批量上傳資料(pipeline
阿新 • • 發佈:2018-12-25
package com.test.redis; import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import org.apache.http.annotation.NotThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.BinaryJedisCluster; import redis.clients.jedis.Client; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisClusterConnectionHandler; import redis.clients.jedis.JedisClusterInfoCache; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisSlotBasedConnectionHandler; import redis.clients.jedis.PipelineBase; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; /** * 在叢集模式下提供批量操作的功能。 <br/> * 由於叢集模式存在節點的動態新增刪除,且client不能實時感知(只有在執行命令時才可能知道叢集發生變更), * 因此,該實現不保證一定成功,建議在批量操作之前呼叫 refreshCluster() 方法重新獲取叢集資訊。<br /> * 應用需要保證不論成功還是失敗都會呼叫close() 方法,否則可能會造成洩露。<br/> * 如果失敗需要應用自己去重試,因此每個批次執行的命令數量需要控制。防止失敗後重試的數量過多。<br /> * 基於以上說明,建議在叢集環境較穩定(增減節點不會過於頻繁)的情況下使用,且允許失敗或有對應的重試策略。<br /> * * * @author youaremoon * @version * @since Ver 1.1 */ @NotThreadSafe public class JedisClusterPipeline extends PipelineBase implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class); // 部分欄位沒有對應的獲取方法,只能採用反射來做 // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根據順序儲存每個命令對應的Client private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用於快取連線 private boolean hasDataInBuf = false; // 是否有資料在快取區 /** * 根據jedisCluster例項生成對應的JedisClusterPipeline * @param * @return */ public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) { JedisClusterPipeline pipeline = new JedisClusterPipeline(); pipeline.setJedisCluster(jedisCluster); return pipeline; } public JedisClusterPipeline() { } public void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } /** * 重新整理叢集資訊,當叢集資訊發生變更時呼叫 * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化 */ public void sync() { innerSync(null); } /** * 同步讀取所有資料 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的資料 */ public List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經新增,就不用再呼叫add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } } @Override public void close() { clean(); clients.clear(); for (Jedis jedis : jedisMap.values()) { if (hasDataInBuf) { flushCachedData(jedis); } jedis.close(); } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { JedisPool pool = clusterInfoCache.getSlotPool(slot); // 根據pool從快取中獲取Jedis Jedis jedis = jedisMap.get(pool); if (null == jedis) { jedis = pool.getResource(); jedisMap.put(pool, jedis); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e); } } @SuppressWarnings({"unchecked" }) private static <T> T getValue(Object obj, Field field) { try { return (T)field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { LOGGER.error("get value fail", e); throw new RuntimeException(e); } } public static void main(String[] args) throws IOException { Set<HostAndPort> nodes = new HashSet<HostAndPort>(); nodes.add(new HostAndPort("127.0.0.1", 9379)); nodes.add(new HostAndPort("127.0.0.1", 9380)); JedisCluster jc = new JedisCluster(nodes); long s = System.currentTimeMillis(); JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc); jcp.refreshCluster(); List<Object> batchResult = null; try { // batch write for (int i = 0; i < 10000; i++) { jcp.set("k" + i, "v1" + i); } jcp.sync(); // batch read for (int i = 0; i < 10000; i++) { jcp.get("k" + i); } batchResult = jcp.syncAndReturnAll(); } finally { jcp.close(); } // output time long t = System.currentTimeMillis() - s; System.out.println(t); System.out.println(batchResult.size()); // 實際業務程式碼中,close要在finally中調,這裡之所以沒這麼寫,是因為懶 jc.close(); } }