tcc transaction擴充套件redis原生叢集
感謝作者貢獻原始碼,傳送門
在使用tcc的過程,預設是使用mysql,在專案中使用時,因為儲存的資料量較大,導致總是出現數據值過大的錯誤(通過更新欄位型別或更新欄位長度可以解決), 於是為了不更新原始碼,採用了redis作為了持久層,但是線上的環境redis是不會單例項的,保證高可靠,勢必會採用主從,提高效能,勢必採用叢集,那麼問題來了,tcc原始碼中未對redis的叢集進行支援。這是擴充套件原始碼原因一。
redis實現叢集有多種方式,原生叢集,通過twemproxy(twitter)或是其他的開源外掛如codis都能實現叢集,但是他們都不支援key的模糊查詢,及時redis原生的叢集也是使用jedisCluster而不用redis(通過jedisCluster能得到jedis,後面程式碼中有使用示例)。此為擴充套件原始碼的原因之二。
直接上關鍵程式碼並加以說明
1、定義jedisCluster的模板方法介面
public interface JedisClusterCallback<T> {
/**
* 支援redis cluster
*/
public T doInJedisCluster(JedisCluster jedisCluster);
}
2、擴充套件RedisHelper新增JedisCluster模板方法的呼叫
/** * 擴充套件redis cluster * @Method_Name :executeCluster * @param jedisCluster * @param callback * @return T * @Creation Date :2018/6/12 */ public static <T> T execute(JedisCluster jedisCluster, JedisClusterCallback<T> callback) { try { return callback.doInJedisCluster(jedisCluster); } finally { if (jedisCluster != null) { // 次數不要執行jedisCluster.close()的方法,jedisCluster在使用結束後會自動釋放jedis資源 } } }
3、新增RedisClusterTransactionReository類並實現CachableTransactionRepository介面,RedisClusterTransactionReository與RedisTransactionReository介面方法一樣,只是呼叫的JedisCluster的模板方法進行值得操作,後面只對模糊查詢的實現進行說明,其他的增刪,JedisCluster與Jedis的用法基本一致。
doFindAll()是定時依賴的方法,此處用到了模糊查詢(TCC監控的模組也用到了模糊查詢,原理基本一致),package org.mengyun.tcctransaction.repository; import org.apache.log4j.Logger; import org.mengyun.tcctransaction.Transaction; import org.mengyun.tcctransaction.repository.helper.ExpandTransactionSerializer; import org.mengyun.tcctransaction.repository.helper.JedisClusterCallback; import org.mengyun.tcctransaction.repository.helper.JedisClusterExtend; import org.mengyun.tcctransaction.repository.helper.RedisHelper; import org.mengyun.tcctransaction.serializer.JdkSerializationSerializer; import org.mengyun.tcctransaction.serializer.ObjectSerializer; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import javax.transaction.xa.Xid; import java.util.*; /** * Created by zc.ding on 2018/06/12 */ public class RedisClusterTransactionRepository extends CachableTransactionRepository { static final Logger logger = Logger.getLogger(RedisClusterTransactionRepository.class.getSimpleName()); private JedisCluster jedisCluster; private JedisClusterExtend jedisClusterExtend; private String keyPrefix = "TCC:"; public void setKeyPrefix(String keyPrefix) { this.keyPrefix = keyPrefix; } private ObjectSerializer serializer = new JdkSerializationSerializer(); public void setSerializer(ObjectSerializer serializer) { this.serializer = serializer; } public void setJedisClusterExtend(JedisClusterExtend jedisClusterExtend) { this.jedisClusterExtend = jedisClusterExtend; this.jedisCluster = jedisClusterExtend.getJedisCluster(); } public void setJedisCluster(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; } @Override protected int doCreate(final Transaction transaction) { try { Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() { @Override public Long doInJedisCluster(JedisCluster jedisCluster) { List<byte[]> params = new ArrayList<byte[]>(); for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) { params.add(entry.getKey()); params.add(entry.getValue()); } Object result = jedisCluster.eval("if redis.call('exists', KEYS[1]) == 0 then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;".getBytes(), Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params); return (Long) result; } }); return statusCode.intValue(); } catch (Exception e) { throw new TransactionIOException(e); } } @Override protected int doUpdate(final Transaction transaction) { try { Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() { @Override public Long doInJedisCluster(JedisCluster jedisCluster) { transaction.updateTime(); transaction.updateVersion(); List<byte[]> params = new ArrayList<byte[]>(); for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) { params.add(entry.getKey()); params.add(entry.getValue()); } Object result = jedisCluster.eval(String.format("if redis.call('hget',KEYS[1],'VERSION') == '%s' then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;", transaction.getVersion() - 1).getBytes(), Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params); return (Long) result; } }); return statusCode.intValue(); } catch (Exception e) { throw new TransactionIOException(e); } } @Override protected int doDelete(final Transaction transaction) { try { Long result = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() { @Override public Long doInJedisCluster(JedisCluster jedisCluster) { return jedisCluster.del(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())); } }); return result.intValue(); } catch (Exception e) { throw new TransactionIOException(e); } } @Override protected Transaction doFindOne(final Xid xid) { try { Long startTime = System.currentTimeMillis(); Map<byte[], byte[]> content = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Map<byte[], byte[]>>() { @Override public Map<byte[], byte[]> doInJedisCluster(JedisCluster jedisCluster) { return jedisCluster.hgetAll(RedisHelper.getRedisKey(keyPrefix, xid)); } }); logger.info("redis find cost time :" + (System.currentTimeMillis() - startTime)); if (content != null && content.size() > 0) { return ExpandTransactionSerializer.deserialize(serializer, content); } return null; } catch (Exception e) { throw new TransactionIOException(e); } } @Override protected List<Transaction> doFindAllUnmodifiedSince(Date date) { List<Transaction> allTransactions = doFindAll(); List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>(); for (Transaction transaction : allTransactions) { if (transaction.getLastUpdateTime().compareTo(date) < 0) { allUnmodifiedSince.add(transaction); } } return allUnmodifiedSince; } protected List<Transaction> doFindAll() { List<Transaction> list = new ArrayList<Transaction>(); try { Set<byte[]> allKeys = new HashSet<byte[]>(); Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); String pattern = keyPrefix + "*"; for(String k : clusterNodes.keySet()){ logger.debug("Getting keys from: " + pattern); JedisPool jp = clusterNodes.get(k); Jedis jedis = jp.getResource(); try { allKeys.addAll(jedis.keys(pattern.getBytes())); } catch(Exception e){ logger.error("Getting keys error: {}", e); } finally{ logger.debug("Connection closed."); jedis.close(); } } for (final byte[] key : allKeys) { Map<byte[], byte[]> map = jedisCluster.hgetAll(key); list.add(ExpandTransactionSerializer.deserialize(serializer, map)); } } catch (Exception e) { throw new TransactionIOException(e); } return list; } }
protected List<Transaction> doFindAll() {
List<Transaction> list = new ArrayList<Transaction>();
try {
Set<byte[]> allKeys = new HashSet<byte[]>();//一定不要用List<byte[]>,只能用Set,防止下面從redis從節點取出重複資料
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();//獲得叢集的所有節點,主從節點都有,及時我們定義是隻定義了主節點列表
String pattern = keyPrefix + "*";
for(String k : clusterNodes.keySet()){
logger.debug("Getting keys from: " + pattern);
JedisPool jp = clusterNodes.get(k);
Jedis jedis = jp.getResource();
try {
allKeys.addAll(jedis.keys(pattern.getBytes()));//遍歷主從節點,拿出所有滿足的key
} catch(Exception e){
logger.error("Getting keys error: {}", e);
} finally{
logger.debug("Connection closed.");
jedis.close();//此處一點要close(), 因為此時用的redis而不是redisCluster
}
}
for (final byte[] key : allKeys) {
Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
list.add(ExpandTransactionSerializer.deserialize(serializer, map));
}
} catch (Exception e) {
throw new TransactionIOException(e);
}
return list;
}
其模糊查詢的原理,就是遍歷所有的主從節點,遍歷每一個節點上的滿足條件的key,由於是叢集+主從,所有相同的key一定會有兩份,通過Set自身去重,剩下就是我們這次模糊查詢結果。JedisCluster的配置是基於HostAndPort配置,檢視JedisCluster發現配置叢集依賴Set<HostAndPort>,為了便於配置在RedisClusterTransactionReository中添加了jedisClusterExtend的擴充套件方便配置。jedisClusterExtend實現方式相對清晰,自定義屬性redisClusterIp支援ip:port,ip:port方式解析,這個配置就簡單很多了。
例如配置JedisCluster叢集如下
<!-- redisl cluster -->
<bean id="transactionRepository" class="org.mengyun.tcctransaction.repository.RedisClusterTransactionRepository">
<property name="keyPrefix" value="TCC:CAP:"/>
<!--<property name="jedisCluster" ref="jedisCluster"/>-->
<property name="jedisClusterExtend" ref="jedisClusterExtend"/>
</bean>
<bean id="jedisClusterExtend" class="org.mengyun.tcctransaction.repository.helper.JedisClusterExtend">
<constructor-arg index="0" value="#{tccDb['redis.cluster.ip']}" type="java.lang.String"/>
<constructor-arg index="1" ref="jedisPoolConfig"/>
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="#{tccDb['redis.total']}"/>
<property name="maxWaitMillis" value="#{tccDb['redis.max.wait.millis']}"/>
</bean>
properties中屬性redis.cluster.ip=127.0.0.1:7000,127.0.0.1:7001即可,可以對比JedisCluster預設配置,會發現簡潔很多。
注意,使用redis_cluster時,redis的客戶端需使用2.9.0以上版本,否則初始化JedisCluster會出現ip埠解析失敗的異常。
至此tcc-transaction支援redis遠端叢集的核心內容已說明,TCC監控的服務也已支援redis原生叢集。更多詳細內容請看原始碼中dubbo的demo和tcc-transcation-server專案,分支request已提交給原作者。這裡再次為tcc原作者開源精神點贊!
歡迎各位指點交流......