1. 程式人生 > >RedisCluster-Pipeline操作,提升10倍以上響應速度

RedisCluster-Pipeline操作,提升10倍以上響應速度

本文中的程式碼來自我正在寫的分散式快取框架(主要解決快取使用中的各種痛點:快取穿透\redis-cluster pipeline\註解使用等等)。

什麼是pipeLine 為什麼使用pipeLine ?

管道(pipeline)將客戶端 client 與伺服器端的互動明確劃分為單向的傳送請求(Send Request)和接收響應(Receive Response):使用者可以將多個操作連續發給伺服器,但在此期間伺服器端並不對每個操作命令傳送響應資料;全部請求傳送完畢後用戶關閉請求,開始接收響應獲取每個操作命令的響應結果。

管道(pipeline)在某些場景下非常有用,比如有多個操作命令需要被迅速提交至伺服器端,但使用者並不依賴每個操作返回的響應結果,對結果響應也無需立即獲得,那麼管道就可以用來作為優化效能的批處理工具。效能提升的原因主要是減少了 TCP 連線中互動往返的開銷。

不過在程式中使用管道請注意,使用 pipeline 時客戶端將獨佔與伺服器端的連線,此期間將不能進行其他“非管道”型別操作,直至 pipeline 被關閉;如果要同時執行其他操作,可以為 pipeline 操作單獨建立一個連線,將其與常規操作分離開來。

從原理上來看,pipeline就是用一個redis 的Socket連線 去多次執行redis命令(傳送請求)而不必等待響應,當所有請求都執行完畢後再一次性的從這個socket中讀取請求。期間減少了在網路上的無用等待,通常會有3-10倍以上的速度提升:

    //非pipeline
    [req1]
         [==waiting===]
[resp1] [req2] [====waiting=====] [resp2] //pipeline [req1][==waiting===] [req2][==waiting===] [resp1] [resp2]

pipeline程式碼示例

@Test

  public void pipeline() throws UnsupportedEncodingException {

    Pipeline p = jedis.pipelined();

    p.set("foo", "bar");

    p.get("foo");

    for(int i=0;i<10;i++){

        p.set("foo"+i, "bar");
    }


    List<Object> results = p.syncAndReturnAll();

  }

為什麼RedisCluster無法使用pipeline?

主要是因為redis-cluster的hash分片,如下圖一個3master-3slave 的 redisCluster:

這裡寫圖片描述

具體的redis命令,會根據key計算出一個槽位(slot),然後根據槽位去特定的節點redis上執行操作。

其中master1代表了 0~5460的槽位,master2代表了 5461~10922的槽位,master1代表了 10923~16383的槽位。

    master1(slave1): 0~5460

    master2(slave2):5461~10922

    master3(slave3):10923~16383

以以下程式碼為例:

        for(int i=0;i<10;i++){
            p.set("foo"+i, "bar");
        }

那麼pipeline中每個單獨的操作,需要根據“key”運算一個槽位(JedisClusterCRC16.getSlot(key)),然後根據槽位去特定的機器執行命令。也就是說一次pipeline操作會使用多個節點的redis連線,而目前JedisCluster是無法支援的。

如何基於JedisCluster擴充套件pipeline?

設計思路(ShardedJedis、redisson也可供參考,):

1.首先要根據key計算出此次pipeline會使用到的節點對於的連線(也就是jedis物件,通常每個節點對應一個Pool)。

2.相同槽位的key,使用同一個jedis.pipeline去執行 命令。

3.合併此次pipeline所有的response返回。

4.連線釋放返回到池中。

也就是講一個JedisCluster下的pipeline分解為每個單節點下獨立的jedisPipeline操作,最後合併response返回。

分享以下部分核心程式碼:



/**
 * @author zhangshuo
 */
@Slf4j
public class JedisClusterPipeLine extends PipelineBase implements Closeable {

......
......

    private final Queue<Client> orderedClients = new LinkedList<Client>();

    /** 一次pipeline過程中使用到的jedis快取 */
    private final Map<JedisPool, Jedis> poolToJedisMap = new HashMap<JedisPool, Jedis>();

    private final JedisSlotBasedConnectionHandler connectionHandler;
    private final JedisClusterInfoCache clusterInfoCache;

    public JedisClusterPipeLine(JedisCluster jedisCluster) {
        this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD);
        this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD);
    }

    @Override
    protected Client getClient(String key) {

        return getClient(SafeEncoder.encode(key));
    }

    @Override
    protected Client getClient(byte[] key) {

        Client client;
        log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(),
                poolToJedisMap.size());

        int slot = JedisClusterCRC16.getSlot(key);

        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        Jedis borrowedJedis = poolToJedisMap.get(pool);

        if (null == borrowedJedis) {
            borrowedJedis = pool.getResource();
            poolToJedisMap.put(pool, borrowedJedis);
        }

        client = borrowedJedis.getClient();

        orderedClients.add(client);

        return client;
    }

    @Override
    public void close() {
        for (Jedis jedis : poolToJedisMap.values()) {
            jedis.close();
        }

        clean();
        orderedClients.clear();
        poolToJedisMap.clear();
    }

    public void sync() {
        for (Client client : orderedClients) {
            generateResponse(client.getOne());
        }
    }

    /**
     * go through all the responses and generate the right response type (warning :
     * usually it is a waste of time).
     * 
     * @return A list of all the responses in the order
     */
    public List<Object> syncAndReturnAll() {
        List<Object> formatted = new ArrayList<Object>();
        for (Client client : orderedClients) {
            formatted.add(generateResponse(client.getOne()).get());
        }
        return formatted;
    }

    public void refreshNodesInfo() {
        connectionHandler.renewSlotCache();
    }
......
......
}

效能對比(提升10倍以上):

    @Test
    public void jedisTest() throws UnsupportedEncodingException {

        long start2 = System.currentTimeMillis();

        try (JedisClusterClient<Object> jc = jedisClusterClient) {
            for (int i = 0; i < 100; i++) {
                jc.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start2);// 5688ms

    }

    /**
     *
     */
    @Test
    public void clusterPipeline() {
        long start = System.currentTimeMillis();
        try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) {
            for (int i = 0; i < 100; i++) {

                pipeline.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start);// 174ms
    }
}

結論:對於批量操作,響應提升明顯:如上本機測試中,提升了約50倍。