redis叢集客戶端JedisCluster優化
Redis在3.0版正式引入了叢集這個特性,擴充套件變得非常簡單。然而當你開心的升級到3.0後,卻發現有些很好用的功能現在工作不了了, 比如我們今天要聊的pipeline功能。
我們知道,普通的情況下,redis client與server之間採用的是請求應答的模式,即:
Client: command1
Server: response1
Client: command2
Server: response2
…
在這種情況下,如果要完成10個命令,則需要20次互動才能完成。因此,即使redis處理能力很強,仍然會受到網路傳輸影響,導致吞吐上不去。而在管道模式下,多個請求變成這樣:
Client: command1,command2…
Server: response1,response2…
在這種情況下,完成命令只需要2次互動。這樣網路傳輸上能夠更加高效,加上redis本身強勁的處理能力,是不是有一種飛一樣的感覺。聽到這裡有沒有去優化應用的衝動? 然而到了cluster模式下,這樣的功能並不支援。 下面我們先來分析下,是什麼原因導致redis cluter沒辦法支援管道模式。首先需要了解叢集下的幾個特性:
- 1、叢集將空間分拆為16384個槽位(slot),每一個節點負責其中一些槽位。遷移時對整個slot遷移
- 2、節點新增,或宕機或不可達的情況下可以正常使用
- 3、不存在中心或者代理節點, 每個節點中都包含叢集所有的節點資訊
- 4、叢集中的節點不會代理請求:即如果client將命令傳送到錯誤的節點上,操作會失敗,但會返回”-MOVED”或”-ASK”,供client進行永久或臨時的節點切換
以上資訊中第3、4點資訊比較重要。
我們先來看第3點,由於每個節點都包含所有的節點資訊,因此client連線任一節點都可以獲取整個叢集的資訊,這樣我們在配置JedisCluster時只需要配置其中一部分節點的資訊就可以(配置多個是為了高可用)。對應的獲取叢集命令為:cluster nodes
127.0.0.1:9380> cluster nodes
b6d0cfe64dbae9590e6fc4c5a8e309debcbe0529 127.0.0.1:9380 myself,master - 0 0 2 connected 5461-10922
b9e5592558aae0f28c79c3750b264d5b2530f6a4 127.0.0.1:9381 master - 0 1466758609932 3 connected 10923-16383
b40095eb2023653eaea5b7b4e242a77a7817889a 127.0.0.1:9379 master - 0 1466758608932 1 connected 0-5460
每一行代表一個節點的資訊,這裡共三個節點(測試用,沒有建slave節點),依次的資訊為:
{id} {ip:port} {flags如master/slave} {master id} {ping-sent} {pong-recv} {config-epoch} {link-state} {slot} {slot} … {slot}
參考: http://redis.io/commands/cluster-nodes
可以看到每個節點對應的slot資訊都在這裡,{slot}格式一般是{begin}-{end}(如0-5460),表示從{begin}到{end}的所有slot都在當前節點中。因此我們可以通過slot找到對應機器的ip:port。 注意,新版client中使用cluster slots獲取對應資料 , 參考: http://redis.io/commands/cluster-slots 。
再來看第4點,由第3點可以知道client可以通過獲取所有節點資訊,根據key計算得到對應的slot後可以找到對應的節點。所以說在節點穩定(沒有增減)的情況下,客戶端可以一直用快取的叢集資訊來發起各種命令。然而,如果節點發生變更客戶端是否能夠立即感知? 目前的client JedisCluster是無法感知的,他是通過執行命令後, 服務端返回的“-MOVED”資訊感知節點的變化,並以此來重新整理快取資訊。
瞭解以上資訊以後,JedisCluster為什麼不支援pipeline就比較清晰了。 因為pipeline模式下命令將被快取到對應的連線(OutputStream)上,而在真正向服務端傳送資料時,節點可能發生了改變,資料就可能發向了錯誤的節點,這導致批量操作失敗,而要處理這種失敗是非常複雜的。至少目前JedisCluster並未提供這樣的機制。(對於單key來說,在發生這種情況的時候,進行簡單的節點資料重新整理+重新發送當前命令來重試)。
看到這裡,你可能會感到沮喪(我猿類如此不易,且行且珍惜)。這裡提供一個簡單的思路,你可以根據單key的邏輯,如果某些key遇到”-MOVE”或”-ASK”則重試。 根據這個思路,你需要按順序記錄所有的命令,每次執行完成後找出異常的資料,重新整理節點資訊後重試,最終將重試(可能有多次)獲取到的結果根據順序資訊插入返回列表。對於重試多次依然失敗的資料,交由業務處理。思路很簡單,然而redis命令太多了,要對PipelineBase的每個方法都這樣改造,我不想(因為我懶呀),而且估計坑也很多,所以這個只能靠你自己去搞了。
下面我說下針對我們的業務做的一個JedisCluster pipeline實現。對應的業務有以下特點:
- 資料為每隔一段時間全量匯入redis叢集,資料量約xx萬(xx較大)
- 匯入任務為後臺執行,可重試,最終如果有部分失敗可接受
- 叢集相對較穩定,不會頻繁的加減機器
- 線上業務不使用該api
package com.yam.common.redis;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
/**
* 在叢集模式下提供批量操作的功能。 <br/>
* 由於叢集模式存在節點的動態新增刪除,且client不能實時感知(只有在執行命令時才可能知道叢集發生變更),
* 因此,該實現不保證一定成功,建議在批量操作之前呼叫 refreshCluster() 方法重新獲取叢集資訊。<br />
* 應用需要保證不論成功還是失敗都會呼叫close() 方法,否則可能會造成洩露。<br/>
* 如果失敗需要應用自己去重試,因此每個批次執行的命令數量需要控制。防止失敗後重試的數量過多。<br />
* 基於以上說明,建議在叢集環境較穩定(增減節點不會過於頻繁)的情況下使用,且允許失敗或有對應的重試策略。<br />
*
*
* @author youaremoon
* @version
* @since Ver 1.1
*/
@NotThreadSafe
public class JedisClusterPipeline extends PipelineBase implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
// 部分欄位沒有對應的獲取方法,只能採用反射來做
// 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
}
private JedisSlotBasedConnectionHandler connectionHandler;
private JedisClusterInfoCache clusterInfoCache;
private Queue<Client> clients = new LinkedList<Client>(); // 根據順序儲存每個命令對應的Client
private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用於快取連線
private boolean hasDataInBuf = false; // 是否有資料在快取區
/**
* 根據jedisCluster例項生成對應的JedisClusterPipeline
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public JedisClusterPipeline() {
}
public void setJedisCluster(JedisCluster jedis) {
connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
}
/**
* 重新整理叢集資訊,當叢集資訊發生變更時呼叫
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步讀取所有資料 並按命令順序返回一個列表
*
* @return 按照命令的順序返回所有的資料
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<Object>();
innerSync(responseList);
return responseList;
}
private void innerSync(List<Object> formatted) {
HashSet<Client> clientSet = new HashSet<Client>();
try {
for (Client client : clients) {
// 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。
// 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
// size相同說明所有的client都已經新增,就不用再呼叫add方法了
if (clientSet.size() != jedisMap.size()) {
clientSet.add(client);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
if (clientSet.size() != jedisMap.size()) {
// 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染
for (Jedis jedis : jedisMap.values()) {
if (clientSet.contains(jedis.getClient())) {
continue;
}
flushCachedData(jedis);
}
}
hasDataInBuf = false;
close();
}
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
if (hasDataInBuf) {
flushCachedData(jedis);
}
jedis.close();
}
jedisMap.clear();
hasDataInBuf = false;
}
private void flushCachedData(Jedis jedis) {
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
}
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
// 根據pool從快取中獲取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
}
hasDataInBuf = true;
return jedis;
}
private static Field getField(Class<?> cls, String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
}
}
@SuppressWarnings({"unchecked" })
private static <T> T getValue(Object obj, Field field) {
try {
return (T)field.get(obj);
} catch (IllegalArgumentException | IllegalAccessException e) {
LOGGER.error("get value fail", e);
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
Set<HostAndPort> nodes = new HashSet<HostAndPort>();
nodes.add(new HostAndPort("127.0.0.1", 9379));
nodes.add(new HostAndPort("127.0.0.1", 9380));
JedisCluster jc = new JedisCluster(nodes);
long s = System.currentTimeMillis();
JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc);
jcp.refreshCluster();
List<Object> batchResult = null;
try {
// batch write
for (int i = 0; i < 10000; i++) {
jcp.set("k" + i, "v1" + i);
}
jcp.sync();
// batch read
for (int i = 0; i < 10000; i++) {
jcp.get("k" + i);
}
batchResult = jcp.syncAndReturnAll();
} finally {
jcp.close();
}
// output time
long t = System.currentTimeMillis() - s;
System.out.println(t);
System.out.println(batchResult.size());
// 實際業務程式碼中,close要在finally中調,這裡之所以沒這麼寫,是因為懶
jc.close();
}
}