1. 程式人生 > >Codis原始碼解析——Jodis

Codis原始碼解析——Jodis

我們在java專案裡面連線已經搭建好的Codis叢集時,需要用到其java客戶端——Jodis。這一篇我們就來看看Jodis是如何操作對Codis叢集進行操作的。

import io.codis.jodis.JedisResourcePool;
import io.codis.jodis.RoundRobinJedisPool;
import redis.clients.jedis.Jedis;
/**
 * @author wujiang
 * @version 1.0.0
 * @date 2017/7/12
 */
public class CodisDemo {
    public static
void main(String[] args) { JedisResourcePool jedisPool = RoundRobinJedisPool.create() .curatorClient("10.0.2.15:2181", 30000).zkProxyDir("/jodis/codis-wujiang").build(); try (Jedis jedis = jedisPool.getResource()) { //省略程式碼 } } }

如果在執行中遇到報錯

Redis.clients.jedis.exceptions.JedisException
: Proxy list empty

Jodis中最重要的一個類就是RoundRobinJedisPool。首先來看看這個類的繼承結構和方法

這裡寫圖片描述

這裡寫圖片描述

在RoundRobinJedisPool.class中有一個內部類Builder,是用來與zk叢集進行連線的。在build之前,一直都是在設定Builder的屬性,例如將Builder的connectionTimeoutMs和soTimeoutMs分別設定為2000,database設定為0,傳入zk地址和路徑等等。然後會呼叫下面的build方法

public static final class Builder {
        private
CuratorFramework curatorClient; private boolean closeCurator; private String zkProxyDir; private String zkAddr; private int zkSessionTimeoutMs; private JedisPoolConfig poolConfig; private int connectionTimeoutMs; private int soTimeoutMs; private String password; private int database; private String clientName; . public RoundRobinJedisPool build() { this.validate(); return new RoundRobinJedisPool(this.curatorClient, this.closeCurator, this.zkProxyDir, this.poolConfig, this.connectionTimeoutMs, this.soTimeoutMs, this.password, this.database, this.clientName, null); } . private void validate() { //首先檢查zkProxyDir和zkAddr是否為空 Preconditions.checkNotNull(this.zkProxyDir, "zkProxyDir can not be null"); //初始化curatorClient並啟動 if (this.curatorClient == null) { Preconditions.checkNotNull(this.zkAddr, "zk client can not be null"); this.curatorClient = CuratorFrameworkFactory.builder().connectString(this.zkAddr).sessionTimeoutMs(this.zkSessionTimeoutMs).retryPolicy(new BoundedExponentialBackoffRetryUntilElapsed(100, 30000, -1L)).build(); this.curatorClient.start(); this.closeCurator = true; } else if (this.curatorClient.getState() == CuratorFrameworkState.LATENT) { this.curatorClient.start(); } if (this.poolConfig == null) { this.poolConfig = new JedisPoolConfig(); } } }

初始化的CuratorClient如下所示,curator-client元件可以作為zookeeper client來使用,它提供了zk例項建立/重連機制等

這裡寫圖片描述

最關鍵的是後面建立RoundRobinJedisPool這一步。

private RoundRobinJedisPool(CuratorFramework curatorClient, boolean closeCurator, String zkProxyDir, JedisPoolConfig poolConfig, int connectionTimeoutMs, int soTimeoutMs, String password, int database, String clientName) {
    //新建ImmutableList<PooledObject>,每個PooledObject包含了proxy addr以及jedisPool
    this.pools = ImmutableList.of();
    this.nextIdx = new AtomicInteger(-1);
    this.poolConfig = poolConfig;
    this.connectionTimeoutMs = connectionTimeoutMs;
    this.soTimeoutMs = soTimeoutMs;
    this.password = password;
    this.database = database;
    this.clientName = clientName;
    this.curatorClient = curatorClient;
    //true
    this.closeCurator = closeCurator;
    this.watcher = new PathChildrenCache(curatorClient, zkProxyDir, true);
    //監聽zkProxyDir下面的變化,也就是當叢集中新加入proxy或者有proxy宕機之後,都會由watcher得到該訊息
    this.watcher.getListenable().addListener(new PathChildrenCacheListener() {
        private void logEvent(PathChildrenCacheEvent event) {
            StringBuilder msg = new StringBuilder("Receive child event: ");
            msg.append("type=").append(event.getType());
            ChildData data = event.getData();
            if (data != null) {
                msg.append(", path=").append(data.getPath());
                msg.append(", stat=").append(data.getStat());
                if (data.getData() != null) {
                    msg.append(", bytes length=").append(data.getData().length);
                } else {
                    msg.append(", no bytes");
                }
            } else {
                msg.append(", no data");
            }

            RoundRobinJedisPool.LOG.info(msg.toString());
        }

        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            this.logEvent(event);
            //如果zk下監聽事件型別為新增、更新、刪除(這是通過靜態程式碼塊注入的),就重置pool
            if (RoundRobinJedisPool.RESET_TYPES.contains(event.getType())) {
                RoundRobinJedisPool.this.resetPools();
            }

        }
    });

    try {
        //以BUILD_INITIAL_CACHE模式啟動watcher
        this.watcher.start(StartMode.BUILD_INITIAL_CACHE);
    } catch (Exception var11) {
        this.close();
        throw new JedisException(var11);
    }

    this.resetPools();
}
//啟動watcher的方法
public void rebuild() throws Exception {
    Preconditions.checkState(!this.executorService.isShutdown(), "cache has been closed");
    this.ensurePath();
    this.clear();
    ///jodis/codis-wujiang下面的proxy,本例中只有一個proxy-00e6b8c9d5cc40ed5f81ea515b2b7695
    List<String> children = (List)this.client.getChildren().forPath(this.path);
    Iterator var2 = children.iterator();

    while(var2.hasNext()) {
        String child = (String)var2.next();
        ///jodis/codis-wujiang/proxy-00e6b8c9d5cc40ed5f81ea515b2b7695
        String fullPath = ZKPaths.makePath(this.path, child);
        this.internalRebuildNode(fullPath);
        if (this.rebuildTestExchanger != null) {
            this.rebuildTestExchanger.exchange(new Object());
        }
    }

    this.offerOperation(new RefreshOperation(this, PathChildrenCache.RefreshMode.FORCE_GET_DATA_AND_STAT));
    }

下一步,resetPool,根據zk中的proxy以及對應的jedis變化,更新叢集中的redisPool。這樣pool中永遠都有每個proxy以及proxy中對應的jedisPool。

注意,下面的addr2Pool是RoundRobinJedisPool當前的pool,從zk當前的資料中取出現在的所有proxy,從addr2Pool中刪除,並放入builder中。如果刪除的時候為null,證明原來的pool中並沒有這個proxy連線,於是新建一個並放到builder中。最後的RoundRobinJedisPool.pool相當於是builder.build出來的。再將addr2Pool中剩餘的proxy連線關閉(這裡用了一個trick,JedisPool實際上是proxy連線)

private static final class PooledObject {
    //addr是proxy的19000埠地址
    public final String addr;
    public final JedisPool pool;

    public PooledObject(String addr, JedisPool pool) {
        this.addr = addr;
        this.pool = pool;
    }
}
 private void resetPools() {
     ImmutableList<RoundRobinJedisPool.PooledObject> pools = this.pools;
     Map<String, RoundRobinJedisPool.PooledObject> addr2Pool = Maps.newHashMapWithExpectedSize(pools.size());
     UnmodifiableIterator var3 = pools.iterator();

     while(var3.hasNext()) {
         RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)var3.next();
         addr2Pool.put(pool.addr, pool);
     }

     com.google.common.collect.ImmutableList.Builder<RoundRobinJedisPool.PooledObject> builder = ImmutableList.builder();
     Iterator var14 = this.watcher.getCurrentData().iterator();

     while(var14.hasNext()) {
         ChildData childData = (ChildData)var14.next();

         try {
         //封裝了proxy addr(19000埠)以及state(是否線上)
             CodisProxyInfo proxyInfo = (CodisProxyInfo)MAPPER.readValue(childData.getData(), CodisProxyInfo.class);
             if ("online".equals(proxyInfo.getState())) {
                 String addr = proxyInfo.getAddr();
                 RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)addr2Pool.remove(addr);

                 //第一次進來的時候,pool為null,會執行下面的pool的構造方法
                 if (pool == null) {
                     LOG.info("Add new proxy: " + addr);
                     String[] hostAndPort = addr.split(":");
                     String host = hostAndPort[0];
                     int port = Integer.parseInt(hostAndPort[1]);
                     pool = new RoundRobinJedisPool.PooledObject(addr, new JedisPool(this.poolConfig, host, port, this.connectionTimeoutMs, this.soTimeoutMs, this.password, this.database, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null));
                 }

                 builder.add(pool);
             }
         } catch (Throwable var12) {
             LOG.warn("parse " + childData.getPath() + " failed", var12);
         }
     }

     this.pools = builder.build();
     var14 = addr2Pool.values().iterator();

     while(var14.hasNext()) {
         RoundRobinJedisPool.PooledObject pool = (RoundRobinJedisPool.PooledObject)var14.next();
         LOG.info("Remove proxy: " + pool.addr);
         pool.pool.close();
     }

 }

建立好的RoundRobinJedisPool.pools如下所示

這裡寫圖片描述
這裡寫圖片描述

當一個pool準備好,下一步我們的操作就是從pool中取出相應的jedis例項,並進行相關操作。這個pool裡面只有服務正常的proxy,error的proxy會從zk被剔除掉,詳見 Codis原始碼解析——proxy新增到叢集

public Jedis getResource() {
    ImmutableList<RoundRobinJedisPool.PooledObject> pools = this.pools;
    if (pools.isEmpty()) {
        throw new JedisException("Proxy list empty");
    } else {
        int current;
        int next;
        do {
            //nextIdx初始值是-1
            current = this.nextIdx.get();
            //pools.size取決於叢集中有多少個proxy,可以看到,負載均衡演算法採取的是輪詢
            next = current >= pools.size() - 1 ? 0 : current + 1;
        } while(!this.nextIdx.compareAndSet(current, next));

        return ((RoundRobinJedisPool.PooledObject)pools.get(next)).pool.getResource();
    }
}

Jodis首先讓你本地的java程式通過zookeeper(或者etcd)連線到proxy,並監聽proxy的變化,因為可能根據需要對proxy做水平擴容。監聽過程中,對由PooledObject組成的pool進行重新整理,然後每次需要對codis叢集進行操作的時候,都按照輪詢負載均衡演算法從pool中取出一個jedis例項進行操作。