redis叢集事務管理工具類
阿新 • • 發佈:2018-12-25
redis叢集物件JedisCluster不支援事務,但是,叢集裡邊的每個節點支援事務。
redis 叢集管理事務的工具類
public class JedisClusterTransactionManager {
private static ThreadLocal<Object> txThreadLocal = new ThreadLocal<>();
private static ThreadLocal<JedisCluster> clusterThreadLocal= new ThreadLocal<>();
//開啟事務
public static void multi(JedisCluster jedisCluster){
clusterThreadLocal.set(jedisCluster);
}
/**
* 儲存string資料型別
* @param key
* @param value
*/
public static void set(String key,String value) {
Transaction tx = getTxByKey(key);
tx.set(key, value);
}
/**
* 批量儲存string資料型別
* @param key
* @param value
*/
public static void mset(String... keysvalues) {
if(keysvalues!=null && keysvalues.length>0) {
for(int i=0;i<keysvalues.length;i+=2) {
String key = keysvalues[i];
String value = keysvalues[i+1];
Transaction tx = getTxByKey(key);
tx.set(key, value);
}
}
}
/**
* 儲存hash資料型別
* @param key
* @param value
*/
public static void hset(String key, String field,String value) {
Transaction tx = getTxByKey(key);
tx.hset(key, field, value);
}
/**
* 批量儲存hash資料型別
* @param key
* @param value
*/
public static void hmset(String key,Map<String,String> hash) {
Transaction tx = getTxByKey(key);
tx.hmset(key, hash);
}
/**
* 儲存list資料型別
* @param key
* @param value
*/
public static void lpush(String key,String... values) {
Transaction tx = getTxByKey(key);
tx.lpush(key, values);
}
/**
* 儲存set資料型別
* @param key
* @param value
*/
public static void sadd(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.sadd(key, member);
}
/**
* 儲存sorted set資料型別
* @param key
* @param value
*/
public static void zadd(String key,Map<String,Double> scoreMembers) {
Transaction tx = getTxByKey(key);
tx.zadd(key, scoreMembers);
}
/**
* 通用刪除
* @param key
* @param value
*/
public static void del(String... keys) {
if(keys!=null && keys.length>0) {
for(String key:keys) {
Transaction tx = getTxByKey(key);
tx.del(key);
}
}
}
/**
* 刪除hash
* @param key
* @param value
*/
public static void hdel(String key,String... field) {
Transaction tx = getTxByKey(key);
tx.hdel(key, field);
}
/**
* 刪除set
* @param key
* @param value
*/
public static void srem(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.srem(key, member);
}
/**
* 刪除sorted set
* @param key
* @param value
*/
public static void zrem(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.zrem(key, member);
}
/**
* 提交
*/
public static void exec() {
Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
for(Entry<String,Transaction> entry:map.entrySet()) {
entry.getValue().exec();
}
}
/**
* 回滾
*/
public static void discard() {
Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
for(Entry<String,Transaction> entry:map.entrySet()) {
entry.getValue().discard();
}
}
/**
* 根據key,得到事務物件
* @param cluster
* @param key
* @return
*/
private static Transaction getTxByKey(String key) {
JedisCluster cluster = clusterThreadLocal.get();
Map<String, Transaction> res = (Map<String, Transaction>)txThreadLocal.get();
if(res==null) {
res = new HashMap<>();
}
Map<String, JedisPool> map = cluster.getClusterNodes();
for(Entry<String,JedisPool> entry:map.entrySet()) {
String keyEntry = entry.getKey();
String strArray[] = keyEntry.split(":");
String host = strArray[0];
Integer port = Integer.parseInt(strArray[1]);
Jedis jedis = new Jedis(host, port);
try {
jedis.exists(key);
Transaction tx = jedis.multi();
res.put(key, tx);
txThreadLocal.set(res);
return tx;
} catch (Exception e) {
jedis.close();
}
}
return null;
}
}