1. 程式人生 > 程式設計 >Redis 批量操作之 pipeline

Redis 批量操作之 pipeline

業務場景

最近專案中場景需要get一批key的value,因為redis的get操作(不單單是get命令)是阻塞的,如果迴圈取值的話,就算是內網,耗時也是巨大的。所以想到了redis的pipeline命令。

pipeline簡介

非pipeline:client一個請求,redis server一個響應,期間client阻塞

Pipeline:redis的管道命令,允許client將多個請求依次發給伺服器(redis的客戶端,如jedisCluster,lettuce等都實現了對pipeline的封裝),過程中而不需要等待請求的回覆,在最後再一併讀取結果即可。

單機版

單機版比較簡單,直接上程式碼

//換成真實的redis例項
Jedis jedis = new Jedis();
//獲取管道
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
    p.get(i + "");
}
//獲取結果
List<Object> results = p.syncAndReturnAll();

複製程式碼

叢集版

因為 JedisCluster 本身不支援 pipeline ,所以我們需要對 JedisCluster 進行一些封裝。

還是一樣,直接上程式碼


import lombok.extern.slf4j.Slf4j;
import
redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; import java.io.Closeable; import java.lang.reflect.Field; import
java.util.*; import java.util.function.BiConsumer; @Slf4j public class JedisClusterPipeline extends PipelineBase implements Closeable { /** * 用於獲取 JedisClusterInfoCache */ private JedisSlotBasedConnectionHandler connectionHandler; /** * 根據hash值獲取連線 */ private JedisClusterInfoCache clusterInfoCache; /** * 也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面 * JedisCluster繼承於BinaryJedisCluster * 在BinaryJedisCluster,connectionHandler屬性protected修飾的,所以需要反射 * * * 而 JedisClusterInfoCache 屬性在JedisClusterConnectionHandler中,但是這個類是抽象類, * 但它有一個實現類JedisSlotBasedConnectionHandler */ private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class,"connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class,"cache"); } /** * 根據順序儲存每個命令對應的Client */ private Queue<Client> clients = new LinkedList<>(); /** * 用於快取連線 * 一次pipeline過程中使用到的jedis快取 */ private Map<JedisPool,Jedis> jedisMap = new HashMap<>(); /** * 是否有資料在快取區 */ private boolean hasDataInBuf = false; /** * 根據jedisCluster例項生成對應的JedisClusterPipeline * 通過此方式獲取pipeline進行操作的話必須呼叫close()關閉管道 * 呼叫本類裡pipelineXX方法則不用close(),但建議最好還是在finally裡呼叫一下close() * @param * @return */ public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) { JedisClusterPipeline pipeline = new JedisClusterPipeline(); pipeline.setJedisCluster(jedisCluster); return pipeline; } public JedisClusterPipeline() { } public void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis,FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler,FIELD_CACHE); } /** * 重新整理叢集資訊,當叢集資訊發生變更時呼叫 * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化 */ public void sync() { innerSync(null); } /** * 同步讀取所有資料 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的資料 */ public List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<>(); innerSync(responseList); return responseList; } @Override public void close() { clean(); clients.clear(); for (Jedis jedis : jedisMap.values()) { if (hasDataInBuf) { flushCachedData(jedis); } jedis.close(); } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { JedisPool pool = clusterInfoCache.getSlotPool(slot); // 根據pool從快取中獲取Jedis Jedis jedis = jedisMap.get(pool); if (null == jedis) { jedis = pool.getResource(); jedisMap.put(pool,jedis); } hasDataInBuf = true; return jedis; } public static void pipelineSetEx(String[] keys,String[] values,int[] exps,JedisCluster jedisCluster) { operate(new Command() { @Override public List execute() { JedisClusterPipeline p = pipelined(jedisCluster); for (int i = 0,len = keys.length; i < len; i++) { p.setex(keys[i],exps[i],values[i]); } return p.syncAndReturnAll(); } }); } public static List<Map<String,String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) { return operate(new Command() { @Override public List execute() { JedisClusterPipeline p = pipelined(jedisCluster); for (int i = 0,len = keys.length; i < len; i++) { p.hgetAll(keys[i]); } return p.syncAndReturnAll(); } }); } public static List<Boolean> pipelineSismember(String[] keys,String members,JedisCluster jedisCluster) { return operate(new Command() { @Override public List execute() { JedisClusterPipeline p = pipelined(jedisCluster); for (int i = 0,len = keys.length; i < len; i++) { p.sismember(keys[i],members); } return p.syncAndReturnAll(); } }); } public static <O> List pipeline(BiConsumer<O,JedisClusterPipeline> function,O obj,JedisCluster jedisCluster) { return operate(new Command() { @Override public List execute() { JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster); function.accept(obj,jcp); return jcp.syncAndReturnAll(); } }); } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<>(); try { for (Client client : clients) { // 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經新增,就不用再呼叫add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred,rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } } private static Field getField(Class<?> cls,String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(),e); } } @SuppressWarnings({"unchecked" }) private static <T> T getValue(Object obj,Field field) { try { return (T)field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { log.error("get value fail",e); throw new RuntimeException(e); } } private static <T> T operate(Command command) { try { return command.execute(); } catch (Exception e) { log.error("redis operate error"); throw new RuntimeException(e); } } interface Command { /** * 具體執行命令 * * @param <T> * @return */ <T> T execute(); } } 複製程式碼

使用demo

    public Object testPipelineOperate() {
        //        String[] keys = {"dylan1","dylan2"};
        //        String[] values = {"dylan1-v1","dylan2-v2"};
        //        int[] exps = {100,200};
        //        JedisClusterPipeline.pipelineSetEx(keys,values,exps,jedisCluster);
        long start = System.currentTimeMillis();

        List<String> keyList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            keyList.add(i + "");
        }
        //        List<String> pipeline = JedisClusterPipeline.pipeline(this::getValue,keyList,jedisCluster);
        //        List<String> pipeline = JedisClusterPipeline.pipeline(this::getHashValue,jedisCluster);
        String[] keys = {"dylan-test1","dylan-test2"};

        List<Map<String,String>> all = JedisClusterPipeline.pipelineHgetAll(keys,jedisCluster);
        long end = System.currentTimeMillis();
        System.out.println("testPipelineOperate cost:" + (end-start));

        return Response.success(all);
    }

複製程式碼