1. 程式人生 > >tcc transaction擴充套件redis原生叢集

tcc transaction擴充套件redis原生叢集

感謝作者貢獻原始碼,傳送門

        在使用tcc的過程,預設是使用mysql,在專案中使用時,因為儲存的資料量較大,導致總是出現數據值過大的錯誤(通過更新欄位型別或更新欄位長度可以解決), 於是為了不更新原始碼,採用了redis作為了持久層,但是線上的環境redis是不會單例項的,保證高可靠,勢必會採用主從,提高效能,勢必採用叢集,那麼問題來了,tcc原始碼中未對redis的叢集進行支援。這是擴充套件原始碼原因一。

        redis實現叢集有多種方式,原生叢集,通過twemproxy(twitter)或是其他的開源外掛如codis都能實現叢集,但是他們都不支援key的模糊查詢,及時redis原生的叢集也是使用jedisCluster而不用redis(通過jedisCluster能得到jedis,後面程式碼中有使用示例)。此為擴充套件原始碼的原因之二。

        直接上關鍵程式碼並加以說明

1、定義jedisCluster的模板方法介面

public interface JedisClusterCallback<T> {
    /**
    *  支援redis cluster
    */
    public T doInJedisCluster(JedisCluster jedisCluster);
}

2、擴充套件RedisHelper新增JedisCluster模板方法的呼叫

/**
     *  擴充套件redis cluster
     *  @Method_Name             :executeCluster
     *  @param jedisCluster
     *  @param callback
     *  @return T
     *  @Creation Date           :2018/6/12
     */
    public static <T> T execute(JedisCluster jedisCluster, JedisClusterCallback<T> callback) {
        try {
            return callback.doInJedisCluster(jedisCluster);
        } finally {
            if (jedisCluster != null) {
//                次數不要執行jedisCluster.close()的方法,jedisCluster在使用結束後會自動釋放jedis資源
            }
        }
    }

3、新增RedisClusterTransactionReository類並實現CachableTransactionRepository介面,RedisClusterTransactionReository與RedisTransactionReository介面方法一樣,只是呼叫的JedisCluster的模板方法進行值得操作,後面只對模糊查詢的實現進行說明,其他的增刪,JedisCluster與Jedis的用法基本一致。

package org.mengyun.tcctransaction.repository;

import org.apache.log4j.Logger;
import org.mengyun.tcctransaction.Transaction;
import org.mengyun.tcctransaction.repository.helper.ExpandTransactionSerializer;
import org.mengyun.tcctransaction.repository.helper.JedisClusterCallback;
import org.mengyun.tcctransaction.repository.helper.JedisClusterExtend;
import org.mengyun.tcctransaction.repository.helper.RedisHelper;
import org.mengyun.tcctransaction.serializer.JdkSerializationSerializer;
import org.mengyun.tcctransaction.serializer.ObjectSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

import javax.transaction.xa.Xid;
import java.util.*;

/**
 * Created by zc.ding on 2018/06/12
 */
public class RedisClusterTransactionRepository extends CachableTransactionRepository {

    static final Logger logger = Logger.getLogger(RedisClusterTransactionRepository.class.getSimpleName());

    private JedisCluster jedisCluster;
    
    private JedisClusterExtend jedisClusterExtend;

    private String keyPrefix = "TCC:";

    public void setKeyPrefix(String keyPrefix) {
        this.keyPrefix = keyPrefix;
    }

    private ObjectSerializer serializer = new JdkSerializationSerializer();

    public void setSerializer(ObjectSerializer serializer) {
        this.serializer = serializer;
    }

    public void setJedisClusterExtend(JedisClusterExtend jedisClusterExtend) {
        this.jedisClusterExtend = jedisClusterExtend;
        this.jedisCluster = jedisClusterExtend.getJedisCluster();
    }

    public void setJedisCluster(JedisCluster jedisCluster) {
        this.jedisCluster = jedisCluster;
    }

    @Override
    protected int doCreate(final Transaction transaction) {
        try {
            Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    List<byte[]> params = new ArrayList<byte[]>();
                    for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) {
                        params.add(entry.getKey());
                        params.add(entry.getValue());
                    }
                    Object result = jedisCluster.eval("if redis.call('exists', KEYS[1]) == 0 then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;".getBytes(),
                            Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);
                    return (Long) result;
                }
            });
            return statusCode.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected int doUpdate(final Transaction transaction) {
        try {
            Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    transaction.updateTime();
                    transaction.updateVersion();
                    List<byte[]> params = new ArrayList<byte[]>();
                    for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) {
                        params.add(entry.getKey());
                        params.add(entry.getValue());
                    }
                    Object result = jedisCluster.eval(String.format("if redis.call('hget',KEYS[1],'VERSION') == '%s' then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;",
                            transaction.getVersion() - 1).getBytes(),
                            Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);

                    return (Long) result;
                }
            });
            return statusCode.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected int doDelete(final Transaction transaction) {
        try {
            Long result = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    return jedisCluster.del(RedisHelper.getRedisKey(keyPrefix, transaction.getXid()));
                }
            });
            return result.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected Transaction doFindOne(final Xid xid) {
        try {
            Long startTime = System.currentTimeMillis();
            Map<byte[], byte[]> content = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Map<byte[], byte[]>>() {
                @Override
                public Map<byte[], byte[]> doInJedisCluster(JedisCluster jedisCluster) {
                    return jedisCluster.hgetAll(RedisHelper.getRedisKey(keyPrefix, xid));
                }
            });
            logger.info("redis find cost time :" + (System.currentTimeMillis() - startTime));
            if (content != null && content.size() > 0) {
                return ExpandTransactionSerializer.deserialize(serializer, content);
            }
            return null;
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
        List<Transaction> allTransactions = doFindAll();
        List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
        for (Transaction transaction : allTransactions) {
            if (transaction.getLastUpdateTime().compareTo(date) < 0) {
                allUnmodifiedSince.add(transaction);
            }
        }
        return allUnmodifiedSince;
    }

    protected List<Transaction> doFindAll() {
        List<Transaction> list = new ArrayList<Transaction>();
        try {
            Set<byte[]> allKeys = new HashSet<byte[]>();
            Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
            String pattern = keyPrefix + "*";
            for(String k : clusterNodes.keySet()){
                logger.debug("Getting keys from: " + pattern);
                JedisPool jp = clusterNodes.get(k);
                Jedis jedis = jp.getResource();
                try {
                    allKeys.addAll(jedis.keys(pattern.getBytes()));
                } catch(Exception e){
                    logger.error("Getting keys error: {}", e);
                } finally{
                    logger.debug("Connection closed.");
                    jedis.close();
                }
            }
            for (final byte[] key : allKeys) {
                Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
                list.add(ExpandTransactionSerializer.deserialize(serializer, map));
            }
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
        return list;
    }
}
doFindAll()是定時依賴的方法,此處用到了模糊查詢(TCC監控的模組也用到了模糊查詢,原理基本一致),
protected List<Transaction> doFindAll() {
        List<Transaction> list = new ArrayList<Transaction>();
        try {
            Set<byte[]> allKeys = new HashSet<byte[]>();//一定不要用List<byte[]>,只能用Set,防止下面從redis從節點取出重複資料
            Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();//獲得叢集的所有節點,主從節點都有,及時我們定義是隻定義了主節點列表
            String pattern = keyPrefix + "*";
            for(String k : clusterNodes.keySet()){
                logger.debug("Getting keys from: " + pattern);
                JedisPool jp = clusterNodes.get(k);
                Jedis jedis = jp.getResource();
                try {
                    allKeys.addAll(jedis.keys(pattern.getBytes()));//遍歷主從節點,拿出所有滿足的key
                } catch(Exception e){
                    logger.error("Getting keys error: {}", e);
                } finally{
                    logger.debug("Connection closed.");
                    jedis.close();//此處一點要close(), 因為此時用的redis而不是redisCluster
                }
            }
            for (final byte[] key : allKeys) {
                Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
                list.add(ExpandTransactionSerializer.deserialize(serializer, map));
            }
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
        return list;
    }
其模糊查詢的原理,就是遍歷所有的主從節點,遍歷每一個節點上的滿足條件的key,由於是叢集+主從,所有相同的key一定會有兩份,通過Set自身去重,剩下就是我們這次模糊查詢結果。

JedisCluster的配置是基於HostAndPort配置,檢視JedisCluster發現配置叢集依賴Set<HostAndPort>,為了便於配置在RedisClusterTransactionReository中添加了jedisClusterExtend的擴充套件方便配置。jedisClusterExtend實現方式相對清晰,自定義屬性redisClusterIp支援ip:port,ip:port方式解析,這個配置就簡單很多了。

例如配置JedisCluster叢集如下

<!-- redisl cluster -->
    <bean id="transactionRepository" class="org.mengyun.tcctransaction.repository.RedisClusterTransactionRepository">
        <property name="keyPrefix" value="TCC:CAP:"/>
        <!--<property name="jedisCluster" ref="jedisCluster"/>-->
        <property name="jedisClusterExtend" ref="jedisClusterExtend"/>
    </bean> 
    <bean id="jedisClusterExtend" class="org.mengyun.tcctransaction.repository.helper.JedisClusterExtend">
        <constructor-arg index="0" value="#{tccDb['redis.cluster.ip']}" type="java.lang.String"/>
        <constructor-arg index="1" ref="jedisPoolConfig"/>
    </bean>    
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="#{tccDb['redis.total']}"/>
        <property name="maxWaitMillis" value="#{tccDb['redis.max.wait.millis']}"/>
    </bean>

properties中屬性redis.cluster.ip=127.0.0.1:7000,127.0.0.1:7001即可,可以對比JedisCluster預設配置,會發現簡潔很多。

      注意,使用redis_cluster時,redis的客戶端需使用2.9.0以上版本,否則初始化JedisCluster會出現ip埠解析失敗的異常。    

      至此tcc-transaction支援redis遠端叢集的核心內容已說明,TCC監控的服務也已支援redis原生叢集。更多詳細內容請看原始碼中dubbo的demo和tcc-transcation-server專案,分支request已提交給原作者。這裡再次為tcc原作者開源精神點贊!

       歡迎各位指點交流......