RedisCluster-Pipeline操作,提升10倍以上響應速度
本文中的程式碼來自我正在寫的分散式快取框架(主要解決快取使用中的各種痛點:快取穿透\redis-cluster pipeline\註解使用等等)。
什麼是pipeLine 為什麼使用pipeLine ?
管道(pipeline)將客戶端 client 與伺服器端的互動明確劃分為單向的傳送請求(Send Request)和接收響應(Receive Response):使用者可以將多個操作連續發給伺服器,但在此期間伺服器端並不對每個操作命令傳送響應資料;全部請求傳送完畢後用戶關閉請求,開始接收響應獲取每個操作命令的響應結果。
管道(pipeline)在某些場景下非常有用,比如有多個操作命令需要被迅速提交至伺服器端,但使用者並不依賴每個操作返回的響應結果,對結果響應也無需立即獲得,那麼管道就可以用來作為優化效能的批處理工具。效能提升的原因主要是減少了 TCP 連線中互動往返的開銷。
不過在程式中使用管道請注意,使用 pipeline 時客戶端將獨佔與伺服器端的連線,此期間將不能進行其他“非管道”型別操作,直至 pipeline 被關閉;如果要同時執行其他操作,可以為 pipeline 操作單獨建立一個連線,將其與常規操作分離開來。
從原理上來看,pipeline就是用一個redis 的Socket連線 去多次執行redis命令(傳送請求)而不必等待響應,當所有請求都執行完畢後再一次性的從這個socket中讀取請求。期間減少了在網路上的無用等待,通常會有3-10倍以上的速度提升:
//非pipeline
[req1]
[==waiting===]
[resp1]
[req2]
[====waiting=====]
[resp2]
//pipeline
[req1][==waiting===]
[req2][==waiting===]
[resp1] [resp2]
pipeline程式碼示例
@Test
public void pipeline() throws UnsupportedEncodingException {
Pipeline p = jedis.pipelined();
p.set("foo", "bar");
p.get("foo");
for(int i=0;i<10;i++){
p.set("foo"+i, "bar");
}
List<Object> results = p.syncAndReturnAll();
}
為什麼RedisCluster無法使用pipeline?
主要是因為redis-cluster的hash分片,如下圖一個3master-3slave 的 redisCluster:
具體的redis命令,會根據key計算出一個槽位(slot),然後根據槽位去特定的節點redis上執行操作。
其中master1代表了 0~5460的槽位,master2代表了 5461~10922的槽位,master1代表了 10923~16383的槽位。
master1(slave1): 0~5460
master2(slave2):5461~10922
master3(slave3):10923~16383
以以下程式碼為例:
for(int i=0;i<10;i++){
p.set("foo"+i, "bar");
}
那麼pipeline中每個單獨的操作,需要根據“key”運算一個槽位(JedisClusterCRC16.getSlot(key)),然後根據槽位去特定的機器執行命令。也就是說一次pipeline操作會使用多個節點的redis連線,而目前JedisCluster是無法支援的。
如何基於JedisCluster擴充套件pipeline?
設計思路(ShardedJedis、redisson也可供參考,):
1.首先要根據key計算出此次pipeline會使用到的節點對於的連線(也就是jedis物件,通常每個節點對應一個Pool)。
2.相同槽位的key,使用同一個jedis.pipeline去執行 命令。
3.合併此次pipeline所有的response返回。
4.連線釋放返回到池中。
也就是講一個JedisCluster下的pipeline分解為每個單節點下獨立的jedisPipeline操作,最後合併response返回。
分享以下部分核心程式碼:
/**
* @author zhangshuo
*/
@Slf4j
public class JedisClusterPipeLine extends PipelineBase implements Closeable {
......
......
private final Queue<Client> orderedClients = new LinkedList<Client>();
/** 一次pipeline過程中使用到的jedis快取 */
private final Map<JedisPool, Jedis> poolToJedisMap = new HashMap<JedisPool, Jedis>();
private final JedisSlotBasedConnectionHandler connectionHandler;
private final JedisClusterInfoCache clusterInfoCache;
public JedisClusterPipeLine(JedisCluster jedisCluster) {
this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD);
this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD);
}
@Override
protected Client getClient(String key) {
return getClient(SafeEncoder.encode(key));
}
@Override
protected Client getClient(byte[] key) {
Client client;
log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(),
poolToJedisMap.size());
int slot = JedisClusterCRC16.getSlot(key);
JedisPool pool = clusterInfoCache.getSlotPool(slot);
Jedis borrowedJedis = poolToJedisMap.get(pool);
if (null == borrowedJedis) {
borrowedJedis = pool.getResource();
poolToJedisMap.put(pool, borrowedJedis);
}
client = borrowedJedis.getClient();
orderedClients.add(client);
return client;
}
@Override
public void close() {
for (Jedis jedis : poolToJedisMap.values()) {
jedis.close();
}
clean();
orderedClients.clear();
poolToJedisMap.clear();
}
public void sync() {
for (Client client : orderedClients) {
generateResponse(client.getOne());
}
}
/**
* go through all the responses and generate the right response type (warning :
* usually it is a waste of time).
*
* @return A list of all the responses in the order
*/
public List<Object> syncAndReturnAll() {
List<Object> formatted = new ArrayList<Object>();
for (Client client : orderedClients) {
formatted.add(generateResponse(client.getOne()).get());
}
return formatted;
}
public void refreshNodesInfo() {
connectionHandler.renewSlotCache();
}
......
......
}
效能對比(提升10倍以上):
@Test
public void jedisTest() throws UnsupportedEncodingException {
long start2 = System.currentTimeMillis();
try (JedisClusterClient<Object> jc = jedisClusterClient) {
for (int i = 0; i < 100; i++) {
jc.set("NO." + i, "value" + i);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() - start2);// 5688ms
}
/**
*
*/
@Test
public void clusterPipeline() {
long start = System.currentTimeMillis();
try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) {
for (int i = 0; i < 100; i++) {
pipeline.set("NO." + i, "value" + i);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() - start);// 174ms
}
}
結論:對於批量操作,響應提升明顯:如上本機測試中,提升了約50倍。