1. 程式人生 > 其它 >Redis學習之Jedis原始碼原理分析探究(BIO手寫Jedis客戶端)

Redis學習之Jedis原始碼原理分析探究(BIO手寫Jedis客戶端)

  在Redis的使用過程中,大多數人都是使用現成的客戶端,如Jedis,Redisson,Lettuce。因此本文研究用BIO的方式手寫Redis客戶端嘗試,對遇到的問題進行探究及總結。

    如何從架構角度思考架構分層?

    Redis通訊協議RESP是怎麼回事?

    如何基於BIO實現Redis客戶端?

    Redis客戶端通訊執行緒安全問題如何解決?

一、Jedis客戶端如何進行架構分層

  要進行遠端訪問,如下圖所示:

              

  • 我們在Java應用程式的客戶端通過訪問包裝的API進行Redis訪問,API使我們直接可以看懂的呼叫入口;
  • 然後API是對Redis通訊協議的包裝,通過對協議的包裝,實現我們對Redis訪問協議的透明使用;
  • 協議是按照一定規則組裝的資料,並不能直接用於網路IO,所以必須進行序列化和反序列化,這樣才能進行遠端Redis的請求呼叫以及返回資料的處理。

二、傳輸層通訊

  基於遠端訪問,我們可以使用BIO的Socket進行通訊,首先定義一個Connection,Connection類包含了建立BIO連線的遠端地址host,埠port,套接字Socket以及輸入輸出流。

  此類一個Connection的構造方法,一個Connection的初始化方法,以及請求傳送方法。

public class Connection {

    private String host;
    private int port;
    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;

    public Connection(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public boolean isConnection() {

        if (socket != null && !socket.isClosed() && !socket.isBound() && socket.isConnected()) {
            return true;
        }
        try {
            socket = new Socket(host, port);
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    public String sendCommand(byte[] command) {
        if (isConnection()) {
            try {
                outputStream.write(command);
                
int length = 0; byte[] response = new byte[1024]; while ((length = inputStream.read(response)) > 0) { return new String(response, 0, length); } } catch (IOException e) { e.printStackTrace(); } } return null; } }

  有了連線類後就可以傳送BIO請求,然後就構建Connection進行請求傳送測試:

public class MainTest {
    public static void main(String[] args) {

        String command = "set ant 123456";
        Connection connection = new Connection("localhost", 6379);
        System.out.println(connection.sendCommand(command.getBytes()));
  }
}

  發現結果如下圖,請求呼叫後沒有返回,但是main方法也沒有結束,通過debug可以知道是因為inputStream.read(response))這一句程式碼是阻塞呼叫,因為一直沒返回結果,因此main方法阻塞,如下圖:

          

  實際上的原因是因為任何請求都是基於協議,傳送了請求command = "set ant 123456"後,由於並未遵循Redis的任何訪問協議,因此Redis無法識別請求並做出返回。

三、協議層包裝

  我們先進行抓包測試,看看Jedis客戶端傳給服務端的到底是些什麼內容,我們自己手寫一個假的服務端,然後用Jedis傳送請求,我們的偽服務端會接受到請求,偽服務端如下所示:

public class Hack {

   public static void main(String[] args) throws Exception {
      ServerSocket serverSocket = new ServerSocket(10000);
      Socket socket = serverSocket.accept();
      byte[] b=new byte[1024];
      socket.getInputStream().read(b);
      System.out.println(new String(b));
   }
}

  然後用一個客戶端進行呼叫:

public class Test {

   public static void main(String[] args) {
      Jedis jedis = new Jedis("redis://localhost:10000");
      jedis.set("monkey", "2019");
      System.out.println(jedis.get("monkey"));
   }
}

  在偽服務端可以接收到請求:

*3
$3
SET
$6
monkey
$4
2019

  可以看到請求是根據一定規則進行了包裝,這就是Redis的RESP協議。RESP協議是在Redis 1.2中引入的,但是它成為Redis 2.0中與Redis伺服器通訊的標準方法。RESP實際上是支援以下資料型別的序列化協議:簡單字串,錯誤,整數,大容量字串和陣列。

  RESP在Redis中用作請求-響應協議的方式如下:

  • 客戶端將命令作為大容量字串的RESP陣列傳送到Redis伺服器。
  • 伺服器根據命令實現以RESP型別之一進行回覆。

  在RESP中,某些資料的型別取決於第一個位元組:

  • 對於簡單字串,答覆的第一個位元組為“ +”
  • 對於錯誤,回覆的第一個位元組為“-”
  • 對於整數,答覆的第一個位元組為“:”
  • 對於批量字串,答覆的第一個位元組為“ $”
  • 對於陣列,回覆的第一個位元組為“*

  另外,RESP可以使用Bulk Strings或Array的特殊變體來表示Null值,如稍後指定。在RESP中,協議的不同部分始終以“ \ r \ n”(CRLF)終止。詳情請檢視https://redis.io/topics/protocol

  我們知道了Redis的RESP規則,那麼我們就可以定義一個協議類Protocol,來實現請求的包裝(本示例未完全實現RESP協議內容,僅實現簡單的SET、GET請求以及內容的解析):

public class Protocol {

    public static final String DOLLER="$";
    public static final String ALLERSTIC="*";
    public static final String CRLF="\r\n";
  
  // 如SET請求 set ant 7777 
  // *3\r\n        長度為3的陣列
  // $3\r\n        第一個字串長度為3
  // SET\r\n       第一個字串為SET
  // $6\r\n        第二個字串長度為6
  // monkey\r\n       第二個字串為ant 
  // $4\r\n        第三個字串長度為4
  // 2019\r\n      第三個字串為2019
    public static byte[] buildRespByte(Command command, byte[]... bytes){
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(ALLERSTIC).append(bytes.length+1).append(CRLF);

     // 封裝方法SET、GET        
     stringBuilder.append(DOLLER).append(command.name().length()).append(CRLF); stringBuilder.append(command.name()).append(CRLF); // 封裝引數 for(byte[] arg:bytes){ stringBuilder.append(DOLLER).append(arg.length).append(CRLF); stringBuilder.append(new String(arg) ).append(CRLF); } return stringBuilder.toString().getBytes(); } public enum Command{ SET,GET } }

  此時我們就可以進行呼叫:

public class MainTest {

    public static void main(String[] args) {
        Connection connection = new Connection("localhost",6379);
        System.out.println(connection.sendCommand(Protocol.buildRespByte(Protocol.Command.SET, "monkey".getBytes(), "2019".getBytes())));
        System.out.println(connection.sendCommand(Protocol.buildRespByte(Protocol.Command.GET, "monkey".getBytes())));
    }

}

  然後得到返回結果為:

+OK

$4
2019

四、API包裝

  如上述呼叫方式,我們直接組裝Connection以及入參進行呼叫是極其不友好的,因此我們建立一個Clientset和get方法進行封裝,然後暴露出呼叫API:

public class Client {

    private Connection connection;

    public SelfRedisClient(String host, int ip) {
        connection = new Connection(host, ip);
    }

    public String set(String key, String value) {
        String result = connection.sendCommand(
                Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes()));
        return result;
    }

    public String get(String key) {
        String result = connection.sendCommand(
                Protocol.buildRespByte(Protocol.Command.GET, key.getBytes()));
        return result;
    }
}

  然後呼叫Main方法:

public class MainTest {
    public static void main(String[] args) {
        Client client = new Client("localhost", 6379);
        System.out.println(client.set("ant", "123456"));
        System.out.println(client.get("ant"));
    }
}

  可以看出結果正常返回,當然我們未對返回結果使用協議解析, 不過這樣使用API呼叫方式已經得到了極大的簡化:

      

五、使用多執行緒對Redis進行請求

  上面的示例是在單執行緒的訪問情況下進行的測試,那麼在多執行緒情況下會如何呢。接下來我們構建一個執行緒池,使用多執行緒對Redis進行請求嘗試,構建一個ClientRunnable方法如下:

public class ClientRunnable implements Runnable {

    private Client client;
    private String value;

    public ClientRunnable(Client client, String value) {
        this.client = client;
        this.value = value;
    }
    @Override
    public void run() {
        client.set("ant", value);
    }
}

  main方法如下:

public class MainTest {
    public static void main(String[] args) {
        Client client = new client("localhost", 6379);
        ExecutorService pool = Executors.newCachedThreadPool();
        for(int i=0;i<20;i++){
            pool.execute(new ClientRunnable(client,"value"+i));
        }
    }
}

  並在set方法中增加輸出到控制檯:

public String set(String key, String value) {
    String result = connection.sendCommand(
            Protocol.buildRespByte(Protocol.Command.SET, key.getBytes(), value.getBytes()));
    System.out.println("Thread name: " + Thread.currentThread().getName() + "[result]: "
            + result.replace("\r\n", "") + " [value]: " + value);
    return result;
}

  檢視結果如下:

        

  發現不但返回結果一次出現了兩個甚至多個Redis服務其返回的OK,而且main方法還未執行結束。為什麼呢,因為在多執行緒下Socket是執行緒不安全的,當多個執行緒訪問Socket的時候,同時傳送了請求,然後請求的返回結果會累積,然後被一個執行緒完全獲取的情況,其餘傳送了請求的執行緒將一直阻塞等待返回,可是已經被先來的執行緒截取了流,因此程式無法繼續執行。

            

  因此現在就需要一個執行緒池來管理Connection,每個執行緒使用一個單獨的Connection,對於沒有拿到Connection的執行緒就在阻塞佇列等待,直到有執行緒完成呼叫,並將Connection釋放回執行緒池,被阻塞的執行緒才繼續進行呼叫。如下圖:

          

六、實現Connection的執行緒池管理

  首先實現一個阻塞佇列用於管理特定數量的Connection,當有Connection使用時就返回Connection,用完Connection後就進行歸還。

public class RedisClientPool {

    private LinkedBlockingQueue<SelfRedisClient> linkedBlockingQueue;

    public RedisClientPool(String host,int port ,int connectionCount){
        this.linkedBlockingQueue = new LinkedBlockingQueue<SelfRedisClient>(connectionCount);
        for(int i=0;i<connectionCount;i++){
            Client client = new Client(host,port);
            linkedBlockingQueue.add(client);
        }
    }

    public Client getClient(){
        try{
            return linkedBlockingQueue.take();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return null;
    }

    public void returnClient(Client client) {
        if(client != null){
            linkedBlockingQueue.add(client);
        }
    }
}

  修改ClientRunnable方法,改為從執行緒池獲取Connection進行請求呼叫:

public class ClientRunnable implements Runnable {

    private RedisClientPool redisClientPool;
    private String value;

    public ClientRunnable(RedisClientPool redisClientPool, String value) {
        this.redisClientPool = redisClientPool;
        this.value = value;
    }

    @Override
    public void run() {
        // 執行前先去管理Connection的阻塞佇列中獲取封裝了Connection的SelfRedisClient
        Client client = redisClientPool.getClient();
        client.set("ant", value);
        // 使用完後進行歸還client
        redisClientPool.returnClient(client);
    }
}

  使用Main方法進行請求呼叫:

public class MainTest {
    public static void main(String[] args) {
        RedisClientPool redisClientPool = new RedisClientPool("localhost",6379,5);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i=0;i<10;i++){
            executorService.execute(new ClientRunnable(redisClientPool,"value"+i));
        }
    }
}

  檢視執行結果:    

          

  可以知道成功返回了所有的請求呼叫,最後也是執行緒9成功將value值修改為value8。

  因此,可以發現使用一個阻塞佇列對Connection資源進行管理不僅近能節省Connection的建立和回收時間,在本例中更核心的功能是實現了執行緒不安全資源的管理。