1. 程式人生 > >Redis之Pipeline使用注意事項

Redis之Pipeline使用注意事項

參考內容:http://www.redis.cn/topics/pipelining.html


重要說明: 使用管道傳送命令時,伺服器將被迫回覆一個佇列答覆,佔用很多記憶體。所以,如果你需要傳送大量的命令,最好是把他們按照合理數量分批次的處理,例如10K的命令,讀回覆,然後再發送另一個10k的命令,等等。這樣速度幾乎是相同的,但是在回覆這10k命令佇列需要非常大量的記憶體用來組織返回資料內容。


Jedis jedis = poolFactory.getjedisResourcePool().getResource();
Pipeline pl = jedis.pipelined();
Pipeline 的特點:
1、Pipeline 實現的原理是佇列,而佇列的原理是時先進先出,這樣就保證資料的順序性
2、jedis.pipelined()方法會先建立一個pipeline的連結物件,詳細的步驟如下:
A)、建立一個新的Pipeline物件
public Pipeline pipelined()
{ pipeline = new Pipeline(); pipeline.setClient(client); return pipeline; }
B)、獲取一個連線並hset操作
public Response<Long> hset(String key, String field, String value)
{ getClient(key).hset(key, field, value); return getResponse(BuilderFactory.LONG); }
C)、把資料作為安全資料,進行操作
public void hset(final String key, final String field, final String value)
{ hset(SafeEncoder.encode(key), SafeEncoder.encode(field), SafeEncoder.encode(value)); }
D)、把指令與資料一塊
public void hset(final byte[] key, final byte[] field, final byte[] value)
{ sendCommand(HSET, key, field, value); }
E)、傳送流
Protocol.sendCommand(outputStream, cmd, args);
F)、做具體的資料操作
public static void sendCommand(final RedisOutputStream os, final Command command,
final byte[]... args)
{ sendCommand(os, command.raw, args); }

Pipeline 使用以及地從實現原理

1、Pipeline 是以流的形式進行儲存資料,Connection類中有載入拼接的命令,詳細如下:
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try

{ connect(); Protocol.sendCommand(outputStream, cmd, args); pipelinedCommands++; return this; }

catch (JedisConnectionException ex) {
/*

  • When client send request which formed by invalid protocol, Redis send back error message
  • before close connection. We try to read it to provide reason of failure.
    */
    try Unknown macro: { String errorMessage = Protocol.readErrorLineIfPossible(inputStream); if (errorMessage != null && errorMessage.length() > 0) { ex = new JedisConnectionException(errorMessage, ex.getCause()); } }

    catch (Exception e)

    { /* * Catch any IOException or JedisConnectionException occurred from InputStream#read and just * ignore. This approach is safe because reading error message is optional and connection * will eventually be closed. */ }

    // Any other exceptions related to connection?
    broken = true;
    throw ex;
    }
    }

在Protocol類中有把資料拼接成資料,其中args是拼接資料的引數
private static void sendCommand(final RedisOutputStream os, final byte[] command,
final byte[]... args) {
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args)

{ os.write(DOLLAR_BYTE); os.writeIntCrLf(arg.length); os.write(arg); os.writeCrLf(); }

} catch (IOException e)

{ throw new JedisConnectionException(e); }

}

2、關於Pipeline 同步資料的問題
A)、Pipeline 有與redis形同的操作,但是在資料落盤的時候需要在執行的方法後新增sync()方法,如果insert時有多條資料,在資料拼接完之後,在執行sync()方法,這樣可以提高效率。
B)、如果在hget()時沒有sync()時會報,沒有在hget()同步資料
C)、如果在hset(),hdel(),hget()獲取資料時都沒有執行sync()方法,但是在最後執行了pl.close()方法,Pipeline 同樣會執行sync()方法,詳細的程式碼如下:
Pipeline類下的close()方法中的clear(),有sync()方法,
@Override
public void close() throws IOException

{ clear(); }

public void clear() {
if (isInMulti())

{ discard(); }

sync();
}

可以看出client.getAll()獲取到了所有的資料,去執行sync()方法
public void sync() {
if (getPipelinedResponseLength() > 0) {
List<Object> unformatted = client.getAll();
for (Object o : unformatted)

{ generateResponse(o); }

}
}

3、Pipeline 的預設的同步的個數為53個,也就是說arges中累加到53條資料時會把資料提交