Redis 批量操作之 pipeline
阿新 • • 發佈:2019-12-31
業務場景
最近專案中場景需要get一批key的value,因為redis的get操作(不單單是get命令)是阻塞的,如果迴圈取值的話,就算是內網,耗時也是巨大的。所以想到了redis的pipeline命令。
pipeline簡介
非pipeline:client一個請求,redis server一個響應,期間client阻塞
Pipeline:redis的管道命令,允許client將多個請求依次發給伺服器(redis的客戶端,如jedisCluster,lettuce等都實現了對pipeline的封裝),過程中而不需要等待請求的回覆,在最後再一併讀取結果即可。
單機版
單機版比較簡單,直接上程式碼
//換成真實的redis例項
Jedis jedis = new Jedis();
//獲取管道
Pipeline p = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
p.get(i + "");
}
//獲取結果
List<Object> results = p.syncAndReturnAll();
複製程式碼
叢集版
因為 JedisCluster 本身不支援 pipeline ,所以我們需要對 JedisCluster 進行一些封裝。
還是一樣,直接上程式碼
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.BiConsumer;
@Slf4j
public class JedisClusterPipeline extends PipelineBase implements Closeable {
/**
* 用於獲取 JedisClusterInfoCache
*/
private JedisSlotBasedConnectionHandler connectionHandler;
/**
* 根據hash值獲取連線
*/
private JedisClusterInfoCache clusterInfoCache;
/**
* 也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面
* JedisCluster繼承於BinaryJedisCluster
* 在BinaryJedisCluster,connectionHandler屬性protected修飾的,所以需要反射
*
*
* 而 JedisClusterInfoCache 屬性在JedisClusterConnectionHandler中,但是這個類是抽象類,
* 但它有一個實現類JedisSlotBasedConnectionHandler
*/
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class,"connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class,"cache");
}
/**
* 根據順序儲存每個命令對應的Client
*/
private Queue<Client> clients = new LinkedList<>();
/**
* 用於快取連線
* 一次pipeline過程中使用到的jedis快取
*/
private Map<JedisPool,Jedis> jedisMap = new HashMap<>();
/**
* 是否有資料在快取區
*/
private boolean hasDataInBuf = false;
/**
* 根據jedisCluster例項生成對應的JedisClusterPipeline
* 通過此方式獲取pipeline進行操作的話必須呼叫close()關閉管道
* 呼叫本類裡pipelineXX方法則不用close(),但建議最好還是在finally裡呼叫一下close()
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public JedisClusterPipeline() {
}
public void setJedisCluster(JedisCluster jedis) {
connectionHandler = getValue(jedis,FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler,FIELD_CACHE);
}
/**
* 重新整理叢集資訊,當叢集資訊發生變更時呼叫
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步讀取所有資料 並按命令順序返回一個列表
*
* @return 按照命令的順序返回所有的資料
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<>();
innerSync(responseList);
return responseList;
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
if (hasDataInBuf) {
flushCachedData(jedis);
}
jedis.close();
}
jedisMap.clear();
hasDataInBuf = false;
}
private void flushCachedData(Jedis jedis) {
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
}
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
// 根據pool從快取中獲取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool,jedis);
}
hasDataInBuf = true;
return jedis;
}
public static void pipelineSetEx(String[] keys,String[] values,int[] exps,JedisCluster jedisCluster) {
operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0,len = keys.length; i < len; i++) {
p.setex(keys[i],exps[i],values[i]);
}
return p.syncAndReturnAll();
}
});
}
public static List<Map<String,String>> pipelineHgetAll(String[] keys,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0,len = keys.length; i < len; i++) {
p.hgetAll(keys[i]);
}
return p.syncAndReturnAll();
}
});
}
public static List<Boolean> pipelineSismember(String[] keys,String members,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline p = pipelined(jedisCluster);
for (int i = 0,len = keys.length; i < len; i++) {
p.sismember(keys[i],members);
}
return p.syncAndReturnAll();
}
});
}
public static <O> List pipeline(BiConsumer<O,JedisClusterPipeline> function,O obj,JedisCluster jedisCluster) {
return operate(new Command() {
@Override
public List execute() {
JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jedisCluster);
function.accept(obj,jcp);
return jcp.syncAndReturnAll();
}
});
}
private void innerSync(List<Object> formatted) {
HashSet<Client> clientSet = new HashSet<>();
try {
for (Client client : clients) {
// 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。
// 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
// size相同說明所有的client都已經新增,就不用再呼叫add方法了
if (clientSet.size() != jedisMap.size()) {
clientSet.add(client);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred,rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
if (clientSet.size() != jedisMap.size()) {
// 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染
for (Jedis jedis : jedisMap.values()) {
if (clientSet.contains(jedis.getClient())) {
continue;
}
flushCachedData(jedis);
}
}
hasDataInBuf = false;
close();
}
}
private static Field getField(Class<?> cls,String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(),e);
}
}
@SuppressWarnings({"unchecked" })
private static <T> T getValue(Object obj,Field field) {
try {
return (T)field.get(obj);
} catch (IllegalArgumentException | IllegalAccessException e) {
log.error("get value fail",e);
throw new RuntimeException(e);
}
}
private static <T> T operate(Command command) {
try {
return command.execute();
} catch (Exception e) {
log.error("redis operate error");
throw new RuntimeException(e);
}
}
interface Command {
/**
* 具體執行命令
*
* @param <T>
* @return
*/
<T> T execute();
}
}
複製程式碼
使用demo
public Object testPipelineOperate() {
// String[] keys = {"dylan1","dylan2"};
// String[] values = {"dylan1-v1","dylan2-v2"};
// int[] exps = {100,200};
// JedisClusterPipeline.pipelineSetEx(keys,values,exps,jedisCluster);
long start = System.currentTimeMillis();
List<String> keyList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
keyList.add(i + "");
}
// List<String> pipeline = JedisClusterPipeline.pipeline(this::getValue,keyList,jedisCluster);
// List<String> pipeline = JedisClusterPipeline.pipeline(this::getHashValue,jedisCluster);
String[] keys = {"dylan-test1","dylan-test2"};
List<Map<String,String>> all = JedisClusterPipeline.pipelineHgetAll(keys,jedisCluster);
long end = System.currentTimeMillis();
System.out.println("testPipelineOperate cost:" + (end-start));
return Response.success(all);
}
複製程式碼