redis叢集-管道
阿新 • • 發佈:2018-12-25
造個輪子
輪子
- jedis自帶的cluster並不支援管道。所以得自己造輪子。
- ShardedJedisPipeline
- 先找一下類似的輪子。jedis-3.0.0.jar裡有一個切片(Sharded)相關的類。它有一個管道實現:ShardedJedisPipeline。它繼承自PipelineBase類。過載了 getClient方法,實現了sync方法。實際上管道先從連線池裡獲取一個jedispool,然後獲取一個client,所有的操作均呼叫這個client,然後通過sync方法將client中的指令一次性發送給server。先在的問題程式設計瞭如何獲取client。
- 獲取client。先檢視一下JedisCluster中原始碼。發現它用JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler類來持有連線。這個類有一個Jedis getConnectionFromSlot(int slot);方法可以通過槽位來獲取jedis連線。3.0的redis都使用CRC16演算法來計算hash,可以使用JedisClusterCRC16的getSlot方法,將key轉化成槽位即可。至於sync方法,照抄ShardedJedisPipeline即可,因為它也是使用的多client(其實不是因為懶,恩)。
- JClusterPipeline(自己的輪子,其實基本照抄ShardedJedisPipeline)
public class JClusterPipeline extends PipelineBase{
private JedisSlotBasedConnectionHandler connection;
private Queue<Client> clients = new LinkedList<Client>();
@Override
protected Client getClient(byte[] key) {
//這個地方可以使用map<solt,client> 來記錄已經申請到的連線,這樣每次需要連線的時候,現在map裡找,這樣就不會那麼容易連線超限。但是懶得寫……
Jedis jedis = connection.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
clients.add(jedis.getClient());
return jedis.getClient();
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
public void clear() {
sync();
}
public void sync() {
for (Client client : clients) {
generateResponse(client.getOne());
}
}
public List<Object> syncAndReturnAll() {
List<Object> formatted = new ArrayList<Object>();
for (Client client : clients) {
formatted.add(generateResponse(client.getOne()).get());
}
return formatted;
}
@Override
public void close() {
clean();
for (Client client : clients) {
client.close();
}
clients.clear();
}
public void setConnectionHandler(JedisSlotBasedConnectionHandler basedConnectionHandler) {
this.connection = basedConnectionHandler;
}
public void renewSlotCache() {
connection.renewSlotCache();
}
}
用輪子
- 要獲取JedisCluster中的JedisSlotBasedConnectionHandler,要麼反射,要麼繼承。我選擇繼承。
public class MyJedisCluster extends JedisCluster{
private JedisSlotBasedConnectionHandler basedConnectionHandler;
public MyJedisCluster(Set<HostAndPort> nodes) {
super(nodes);
//如果用JedisClusterConnectionHandler,沒法直接調Jedis getConnectionFromSlot(int slot);方法
basedConnectionHandler = (JedisSlotBasedConnectionHandler) connectionHandler;
}
public Pipeline pipeline() {
JClusterPipeline pipeline = new JClusterPipeline();
pipeline.setConnectionHandler(basedConnectionHandler);
return pipeline;
}
@Override
public void close() {
super.close();
}
}
一個意外的發現ClusterPipeline
- 翻了翻jar,發現有一個ClusterPipeline,裡面聲明瞭一些有關cluster的方法,實現一下。這個介面被MultiKeyPipelineBase實現了,所以照抄一下。舉個栗子。
MultiKeyPipelineBase.java 的實現
public Response<String> clusterNodes() {
client.clusterNodes();
return getResponse(BuilderFactory.STRING);
}
這裡需要一個client。考慮到連線是可貴的,可以先看一下clients裡有沒有已經獲取到的client,
如果有直接使用,如果沒有,就使用connection.getConnection().getClient();方法獲取一個client。
JClusterPipeline的實現
public Response<String> clusterNodes() {
Client client = null;
if(clients.isEmpty()){
client = connection.getConnection().getClient();
}else {
client = clients.peek();
}
client.clusterNodes();
clients.add(client);
return getResponse(BuilderFactory.STRING);
}
一個坑
- 獲取連線的時候,一定要注意不要超過連線池的數量,不然會一直鎖住,直到獲取到連線。假如有十個連線,如果操作的11個key都在不同的槽裡,這樣一次操作就會申請11個連線,這就超了……
- 由於Cluster使用的是去中心化設計,所以客戶端無法感知server節點的變化,所以得手動meet。不過這個可以通過一個代理來實現,要麼自己寫,要麼用twitter 開源的 twemproxy。知乎上說效能會下降18%。