1. 程式人生 > >redis叢集-管道

redis叢集-管道

造個輪子

輪子

  • jedis自帶的cluster並不支援管道。所以得自己造輪子。
  • ShardedJedisPipeline
    • 先找一下類似的輪子。jedis-3.0.0.jar裡有一個切片(Sharded)相關的類。它有一個管道實現:ShardedJedisPipeline。它繼承自PipelineBase類。過載了 getClient方法,實現了sync方法。實際上管道先從連線池裡獲取一個jedispool,然後獲取一個client,所有的操作均呼叫這個client,然後通過sync方法將client中的指令一次性發送給server。先在的問題程式設計瞭如何獲取client。
    • 獲取client。先檢視一下JedisCluster中原始碼。發現它用JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler類來持有連線。這個類有一個Jedis getConnectionFromSlot(int slot);方法可以通過槽位來獲取jedis連線。3.0的redis都使用CRC16演算法來計算hash,可以使用JedisClusterCRC16的getSlot方法,將key轉化成槽位即可。至於sync方法,照抄ShardedJedisPipeline即可,因為它也是使用的多client(其實不是因為懶,恩)。
  • JClusterPipeline(自己的輪子,其實基本照抄ShardedJedisPipeline)
public class JClusterPipeline extends PipelineBase{

    private JedisSlotBasedConnectionHandler connection;

    private Queue<Client> clients = new LinkedList<Client>();

    @Override
    protected Client getClient(byte[] key) {
    //這個地方可以使用map<solt,client> 來記錄已經申請到的連線,這樣每次需要連線的時候,現在map裡找,這樣就不會那麼容易連線超限。但是懶得寫……
Jedis jedis = connection.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); clients.add(jedis.getClient()); return jedis.getClient(); } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } public
void clear() { sync(); } public void sync() { for (Client client : clients) { generateResponse(client.getOne()); } } public List<Object> syncAndReturnAll() { List<Object> formatted = new ArrayList<Object>(); for (Client client : clients) { formatted.add(generateResponse(client.getOne()).get()); } return formatted; } @Override public void close() { clean(); for (Client client : clients) { client.close(); } clients.clear(); } public void setConnectionHandler(JedisSlotBasedConnectionHandler basedConnectionHandler) { this.connection = basedConnectionHandler; } public void renewSlotCache() { connection.renewSlotCache(); } }

用輪子

  • 要獲取JedisCluster中的JedisSlotBasedConnectionHandler,要麼反射,要麼繼承。我選擇繼承。
public class MyJedisCluster extends JedisCluster{

    private JedisSlotBasedConnectionHandler basedConnectionHandler;

    public MyJedisCluster(Set<HostAndPort> nodes) {
        super(nodes);
        //如果用JedisClusterConnectionHandler,沒法直接調Jedis getConnectionFromSlot(int slot);方法
        basedConnectionHandler = (JedisSlotBasedConnectionHandler) connectionHandler;
    }

    public Pipeline pipeline() {
        JClusterPipeline pipeline = new JClusterPipeline();
        pipeline.setConnectionHandler(basedConnectionHandler);
        return pipeline;
    }

    @Override
    public void close() {
        super.close();
    }
}

一個意外的發現ClusterPipeline

  • 翻了翻jar,發現有一個ClusterPipeline,裡面聲明瞭一些有關cluster的方法,實現一下。這個介面被MultiKeyPipelineBase實現了,所以照抄一下。舉個栗子。
MultiKeyPipelineBase.java 的實現
  public Response<String> clusterNodes() {
    client.clusterNodes();
    return getResponse(BuilderFactory.STRING);
  }
  這裡需要一個client。考慮到連線是可貴的,可以先看一下clients裡有沒有已經獲取到的client,
  如果有直接使用,如果沒有,就使用connection.getConnection().getClient();方法獲取一個client。
  JClusterPipeline的實現
    public Response<String> clusterNodes() {
        Client client = null;
        if(clients.isEmpty()){
            client = connection.getConnection().getClient();
        }else {
            client = clients.peek();
        }
        client.clusterNodes();
        clients.add(client);
        return getResponse(BuilderFactory.STRING);
    }

一個坑

  • 獲取連線的時候,一定要注意不要超過連線池的數量,不然會一直鎖住,直到獲取到連線。假如有十個連線,如果操作的11個key都在不同的槽裡,這樣一次操作就會申請11個連線,這就超了……
  • 由於Cluster使用的是去中心化設計,所以客戶端無法感知server節點的變化,所以得手動meet。不過這個可以通過一個代理來實現,要麼自己寫,要麼用twitter 開源的 twemproxy。知乎上說效能會下降18%。