1. 程式人生 > >Android最佳實踐——深入淺出WebSocket協議

Android最佳實踐——深入淺出WebSocket協議

首先明確一下概念,WebSocket協議是一種建立在TCP連線基礎上的全雙工通訊的協議。概念強調了兩點內容:

  • TCP基礎上
  • 全雙工通訊

那麼什麼是全雙工通訊呢?

全雙工就是指客戶端和服務端可以同時進行雙向通訊,強調同時、雙向通訊

WebSocket可以應用於即時通訊等場景,比如現在直播很火熱,直播中的彈幕也可以使用WebSocket去實現。

在Android客戶端,一般可以使用下面的庫完成WebSocket通訊

  • okhttp,一般人我不告訴他okhttp還可以用來進行WebSocket通訊
  • Java-WebSocket,純java實現的WebSocket客戶端和服務端實現

那麼在沒有服務端支援的情況下,我們客戶端如何進行WebSocket的測試呢?一般人我也不告訴他!答案還是okhttp,這次是okhttp的擴充套件模組mockserver,不過最新版本的okhttp已經把WebSocket合入okhttp核心庫中去了,如果你用的版本比較低,就可能需要依賴okhttp-ws模組。

先來看協議內容組成,先上一張神圖

這裡寫圖片描述

WebSocket按上面圖中協議規則進行傳輸,上圖稱為一個數據幀。

  • FIN,共1位,標記訊息是否是最後1幀,1個訊息由1個或多個數據幀構成,若訊息由1幀構成,起始幀就是結束幀。
  • RSV1,RSV2,RSV3,各1位,預留位,用於自定義擴充套件。如果沒有擴充套件,各位值為0;如果定義了擴充套件,即為非0值。如果接收的幀中此處為非0,但是擴充套件中卻沒有該值的定義,那麼關閉連線。
  • OPCODE,共4位,幀型別,分為控制幀和非控制幀。如果接收到未知幀,接收端必須關閉連線。已定義的幀型別如下圖所示:
    這裡寫圖片描述

除了上圖中的0,1,2外(0x0,0x1,0x2),3-7(0x3-0x7)暫時沒有進行定義,為以後的非控制幀保留。
除了上圖中的8,9,10(0x8,0x9,0xA)外,11-15(0xB-0xF)暫時沒有進行定義,為以後的控制幀保留。

訊息的分片,一般來說,對於一個長度較小的訊息,可以使用1幀完成訊息的傳送, 比如說文字訊息,Fin的值為1,表示結束,Opcode值不能為0,0表示後續還有資料幀會發送過來。
而對於一些長度較長的訊息,則需要將訊息進行分片傳送。比如語音訊息,這時候起始幀的FIN值為0,Opcode為非0,接著是若干幀(FIN值都為0,Opcode值為0),結束幀FIN值為1,Opcode值為0。

WebSocket的控制幀有3種,關閉幀、Ping幀、Pong幀,關閉幀很好理解,客戶端如果收到關閉幀直接關閉連線即可,當然客戶端也可以傳送關閉幀給伺服器端。而Ping幀和Pong幀則是WebSocket的心跳檢測,用於保證客戶端是線上的,一般來說,只有服務端給客戶端傳送Ping幀,然後客戶端傳送Pong幀進行迴應,表示自己還線上,可以進行後續通訊。

  • MASK,共1位,掩碼位,表示幀中的資料是否經過加密,客戶端發出的資料幀需要經過掩碼處理,這個值都是1。如果值是1,那麼Masking-key域的資料就是掩碼祕鑰,用於解碼PayloadData,否則Masking-key長度為0。

WebSocket協議規定資料通過幀序列傳輸。客戶端必須對其傳送到伺服器的所有幀進行掩碼處理。

伺服器一旦收到無掩碼幀,將關閉連線。伺服器可能傳送一個狀態碼是1002(表示協議錯誤)的Close幀。

而伺服器傳送客戶端的資料幀不做掩碼處理,一旦客戶端發現經過掩碼處理的幀,將關閉連線。客戶端可能使用狀態碼1002。

更多狀態碼如下圖所示:

這裡寫圖片描述

  • Payload len,7位或者7+16位或者7+64位,表示資料幀中資料大小,這裡有好幾種情況。

    • 如果值為0-125,那麼該值就是payload data的真實長度。
    • 如果值為126,那麼該7位後面緊跟著的2個位元組就是payload data的真實長度。
    • 如果值為127,那麼該7位後面緊跟著的8個位元組就是payload data的真實長度。
    • 長度遵循一個原則,就是用最少的位元組表示長度,舉個例子,當payload data的真實長度是124時,在0-125之間,必須用7位表示;不允許將這7位表示成126或者127,然後後面用2個位元組或者8個位元組表示124,這樣做就違反了原則。
  • Masking-key ,0或者4個位元組,當MASK位為1時,4個位元組,否則0個位元組。如果MASK值為1,則發出去的資料需要經過加密處理,加密流程如下:

void mask(byte[] original, byte[] maskKey) {
   for (int i = 0; i < original.length; i++) {
        original[i] = (byte) (original[i] ^ maskKey[i % 4]);
   }
}
  • 最後就是Payload data,其大小是(x+y)個位元組,x是Extension data,即擴充套件資料,y是Application data,即程式資料,擴充套件資料可能為0。 如果擴充套件資料不為0,必須提前進行協商,規定其長度,否則是不合法的資料幀。

以上是WebSocket資料傳輸的幀內容,大致瞭解即可。除此之外,WebSocket協議還有一個握手的過程。握手通過傳送一個http請求來完成,這裡基本和http2.0有點類似,客戶端傳送一個請求協議升級的get請求給服務端,服務端如果支援的話會返回http code 為101,表示可以切換到對應的協議。大致流程如下:

  • 客戶端傳送get請求協議升級
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: RCfYMqhgCo4N4E+cIZ0iPg==
Sec-WebSocket-Version: 13

該請求會在請求頭上帶上WebSocket的版本號,這裡是13,以及客戶端隨機生成的Sec-WebSocket-Key,伺服器端收到後根據這個key進行一些處理,返回一個Sec-WebSocket-Accept的值給客戶端。

  • 服務端返回同意升級到WebSocket協議
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: upgrade
Sec-WebSocket-Accept: b7RAFizjwDE9lWS46ZMPfmN35wc=

收到響應後,響應頭中包含Sec-WebSocket-Accept值,該值表示伺服器端同意握手,值的計算方式如下:

$(Sec-WebSocket-Accept)=BASE64(SHA1($(Sec-WebSocket-Key)+"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))

客戶端得到該值後,對本地的Sec-WebSocket-Key進行同樣的編碼,然後對比,如果相同則可以進行後續處理。

關於WebSocket協議,一般來說,如果是通過https協議開始升級而來的,那麼一般是wss://開頭,如果是http協議開始升級而來的,那麼一般是ws://開頭

講完了概念性的東西,接下來就是最佳實踐了。

那麼客戶端怎麼進行WebSocket測試呢?這裡我們使用OkHttp的擴充套件模組Mock Server來實現。

首先引入okhttp依賴和mockserver依賴,對maven來說,內容如下

<dependency>
  <groupId>com.squareup.okhttp3</groupId>
  <artifactId>okhttp</artifactId>
  <version>3.4.1</version>
</dependency>
<dependency>
  <groupId>com.squareup.okhttp3</groupId>
  <artifactId>mockwebserver</artifactId>
  <version>3.4.1</version>
</dependency>

對gradle來說,其內容如下

compile 'com.squareup.okhttp3:okhttp:3.4.1'
compile 'com.squareup.okhttp3:mockwebserver:3.4.1'

接下來我們實現一個功能,功能大致如下:

  • 客戶端和服務端進行建連
  • 連線建立後客戶端向服務端傳送文字內容command 1
  • 伺服器端收到文字內容command 1後返回給客戶端內容replay command 1
  • 客戶端收到了伺服器端返回的replay command 1後再次向服務端傳送command 2
  • 服務端收到文字內容command 2後,傳送一個ping幀,客戶端收到ping幀後會將ping幀內容原封不動的以pong幀返回給伺服器端
  • 伺服器端收到客戶端返回的心跳pong幀後,傳送一個關閉連線的控制幀
  • 客戶端收到關閉控制幀後關閉連線
  • 伺服器端檢測到客戶端關閉連線,關閉連線

這裡需要注意一點,okhttp內部對執行緒做了檢測,也就是收到訊息的執行緒為read執行緒,那麼回覆訊息不能再read執行緒中去回覆,而要開一個write執行緒,具體可以看原始碼,不遵循的話就就會扔異常出來。

 if (Thread.currentThread() == looperThread) {
      throw new IllegalStateException("attempting to write from reader thread");
    }

looperThread就是read執行緒。

知道了這一點後,我們根據上面的步驟實現一下,首先是server端,使用MockWebServer構造一個mock server物件,順便new一個執行緒池,用於write執行緒回寫訊息。

private final MockWebServer mockWebServer = new MockWebServer();
private final ExecutorService writeExecutor = Executors.newSingleThreadExecutor();

然後起一個webserver

mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new WebSocketListener() {
    WebSocket webSocket = null;

    @Override
    public void onOpen(final WebSocket webSocket, Response response) {
        //儲存引用,用於後續操作
        this.webSocket = webSocket;
        //列印一些內容
        System.out.println("server onOpen");
        System.out.println("server request header:" + response.request().headers());
        System.out.println("server response header:" + response.headers());
        System.out.println("server response:" + response);


    }

    @Override
    public void onMessage(ResponseBody message) throws IOException {
        String string = message.string();
        System.out.println("server onMessage");
        System.out.println("message:" + string);
        //注意下面都是write執行緒回寫給客戶端
        //當收到客戶端的command 1時回覆replay command 1
        if ("command 1".equals(string)) {
            //replay it
            writeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        webSocket.message(RequestBody.create(WebSocket.TEXT, "replay command 1"));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        } else if ("command 2".equals(string)) {
            //當收到客戶端的command 2時,傳送ping幀
            //ping it
            writeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        webSocket.ping(ByteString.of("ping from server...".getBytes()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }

    @Override
    public void onPong(ByteString payload) {
        //列印一些內容
        System.out.println("server onPong");
        //注意下面都是write執行緒回寫給客戶端
        //客戶端收到ping幀後會回覆pong幀,回撥到這,收到pong幀後關閉連線
        //close it
        writeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    webSocket.close(1000, "Normal Closure");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

    }

    @Override
    public void onClose(int code, String reason) {
        //列印一些內容
        System.out.println("server onClose");
        System.out.println("code:" + code + " reason:" + reason);
    }

    @Override
    public void onFailure(Throwable t, Response response) {
        //出現異常會進入此回撥
        System.out.println("server onFailure");
        System.out.println("throwable:" + t);
        System.out.println("response:" + response);
    }
}));

然後是客戶端的實現,也安裝上面的步驟來即可。

不過這之前需要知道伺服器端的Host和port,這兩個值可以通過mockWebServer物件獲得。

String hostName = mockWebServer.getHostName();
int port = mockWebServer.getPort();

System.out.println("hostName:" + hostName);
System.out.println("port:" + port);

然後通過host和port構造請求

//新建client
OkHttpClient client = new OkHttpClient.Builder()
        .build();
//構造request物件
Request request = new Request.Builder()
        .url("ws://" + hostName + ":" + port + "/")
        .build();
//new 一個websocket呼叫物件並建立連線
client.newWebSocketCall(request).enqueue(new WebSocketListener() {
    WebSocket webSocket = null;

    @Override
    public void onOpen(final WebSocket webSocket, Response response) {
        //儲存引用,用於後續操作
        this.webSocket = webSocket;
        //列印一些內容
        System.out.println("client onOpen");
        System.out.println("client request header:" + response.request().headers());
        System.out.println("client response header:" + response.headers());
        System.out.println("client response:" + response);
        //注意下面都是write執行緒回寫給客戶端
        //建立連線成功後,發生command 1給伺服器端
        writeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    webSocket.message(RequestBody.create(WebSocket.TEXT, "command 1"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void onMessage(ResponseBody message) throws IOException {
        //列印一些內容
        String string = message.string();
        System.out.println("client onMessage");
        System.out.println("message:" + string);

        //注意下面都是write執行緒回寫給客戶端
        if ("replay command 1".equals(string)) {
            //收到伺服器返回的replay command 1後繼續向伺服器端傳送command 2
            //replay it
            writeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        webSocket.message(RequestBody.create(WebSocket.TEXT, "command 2"));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void onPong(ByteString payload) {
        //列印一些內容
        System.out.println("client onPong");
        System.out.println("payload:" + payload);
    }

    @Override
    public void onClose(int code, String reason) {
        //列印一些內容
        System.out.println("client onClose");
        System.out.println("code:" + code + " reason:" + reason);
    }

    @Override
    public void onFailure(Throwable t, Response response) {
        //發生錯誤時會回撥到這
        System.out.println("client onFailure");
        System.out.println("throwable:" + t);
        System.out.println("response:" + response);
    }
}); 

最終輸出如下圖所示

這裡寫圖片描述

除了文字內容外,也可以傳送二進位制內容,如影象,語音,視訊等,所以我們完全可以自定義傳送的內容。

webSocket.message(RequestBody.create(WebSocket.BINARY,bytes));

而除了okhttp外,我們也可以使用Java-Websocket庫來實現,其maven依賴如下

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.3.0</version>
</dependency> 

gradle依賴如下

compile 'org.java-websocket:Java-WebSocket:1.3.0'

用法也和okhttp類似,具體細節不追究,大概給一個demo,開啟一個mock server可以使用WebSocketServer物件,因為run了一個server只會會迴圈阻塞當前執行緒,所以我們在子執行緒中run。

private final ExecutorService executorService = Executors.newSingleThreadExecutor();

try {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            WebSocketServer webSocketServer = new WebSocketServer(new InetSocketAddress("localhost", 8080)) {
                @Override
                public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
                    System.out.println("server onOpen");
                }

                @Override
                public void onClose(WebSocket webSocket, int i, String s, boolean b) {
                    System.out.println("server onClose:" + i + " " + s + " " + b);
                }

                @Override
                public void onMessage(WebSocket webSocket, String s) {
                    System.out.println("server onMessage:" + s);
                }

                @Override
                public void onError(WebSocket webSocket, Exception e) {
                    System.out.println("server onMessage:" + e);
                }
            };
            webSocketServer.run();
        }
    });

} catch (Exception e) {
    e.printStackTrace();
}

然後客戶端可以使用WebSocketClient物件

private final ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
    @Override
    public void run() {
        Map<String, String> headers = new HashMap();
        WebSocketClient webSocketClient = new WebSocketClient(URI.create("ws://localhost:8080/"), new Draft_17(), headers, 10) {

            @Override
            public void onOpen(ServerHandshake serverHandshake) {
                System.out.println("client onOpen");
            }

            @Override
            public void onMessage(String s) {
                System.out.println("client onMessage:" + s);
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                System.out.println("client onClose:" + i + " " + s + " " + b);
            }

            @Override
            public void onError(Exception e) {
                System.out.println("client onError:" + e);
            }
        };

        webSocketClient.connect();
    }
});

知道了如何使用之後,我們來深究一些okhttp內部是怎麼實現的WebSocket協議,其內部定義了三個介面,首先是WebSocket介面,用於實現傳送訊息幀,ping檢測心跳,close關閉連線,其內部還定義了兩個常量,用於傳送不同型別的幀。

public interface WebSocket {
  //文字幀時使用
  MediaType TEXT = MediaType.parse("application/vnd.okhttp.websocket+text; charset=utf-8");
  //二進位制幀時使用
  MediaType BINARY = MediaType.parse("application/vnd.okhttp.websocket+binary");


  void message(RequestBody message) throws IOException;


  void ping(ByteString payload) throws IOException;


  void close(int code, String reason) throws IOException;
}

接著是WebSocketListener介面,用於進行各種回撥,如建立連線成功時的回撥,收到訊息幀時的回撥,收到Pong幀時的回撥,關閉連線時的回撥,以及連線過程中發生任何錯誤的回撥,其定義如下:

public interface WebSocketListener {

  void onOpen(WebSocket webSocket, Response response);

  void onMessage(ResponseBody message) throws IOException;

  void onPong(ByteString payload);

  void onClose(int code, String reason);

  void onFailure(Throwable t, Response response);
}

最後一個是類似Http請求時OkHttp返回的Call物件,定義了幾個方法,如獲取request物件,非同步請求,取消連線,判斷是否已經執行過,是否已經被取消了,以及一個clone方法,返回一個可被重新執行的WebSocketCall物件,此外,內部還定義了一個Factory介面,該介面被OkHttpClient所實現,用於返回一個WebSocketCall物件,從而建立WebSocket連線。

public interface WebSocketCall extends Cloneable {
  Request request();

  void enqueue(WebSocketListener listener);

  void cancel();

  boolean isExecuted();

  boolean isCanceled();

  WebSocketCall clone();

  interface Factory {
    WebSocketCall newWebSocketCall(Request request);
  }
}

OkHttpClient內部實現的Factory介面中的方法如下,返回了WebSocketCall的實現類RealWebSocketCall。

public WebSocketCall newWebSocketCall(Request request) {
    return new RealWebSocketCall(this, request);
  }

在RealWebSocketCall建構函式中,主要做一件事情,就是構造請求協議升級的請求。必須是Get請求,然後生成一個隨機數,進行base64編碼,設定為請求頭Sec-WebSocket-Key的值,OkHttp內部實現的WebSocket版本是13,所以新增請求頭Sec-WebSocket-Version=13

  RealWebSocketCall(OkHttpClient client, Request request) {
    this(client, request, new SecureRandom());
  }

  RealWebSocketCall(OkHttpClient client, Request request, Random random) {
    if (!"GET".equals(request.method())) {
      throw new IllegalArgumentException("Request must be GET: " + request.method());
    }
    this.random = random;

    byte[] nonce = new byte[16];
    random.nextBytes(nonce);
    key = ByteString.of(nonce).base64();

    client = client.newBuilder()
        .readTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection.
        .writeTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection.
        .protocols(ONLY_HTTP1)
        .build();

    originalRequest = request;
    request = request.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();

    call = new RealCall(client, request, true /* for web socket */);
  }

當我們呼叫enqueue方法非同步進行連線時,就會發送建構函式裡構造的http升級協議請求,當伺服器端返回響應體時,進行解析,獲得StreamWebSocket物件。

StreamWebSocket create(Response response, WebSocketListener listener) throws IOException {
    if (response.code() != 101) {
      throw new ProtocolException("Expected HTTP 101 response but was '"
          + response.code()
          + " "
          + response.message()
          + "'");
    }

    String headerConnection = response.header("Connection");
    if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
      throw new ProtocolException(
          "Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'");
    }
    String headerUpgrade = response.header("Upgrade");
    if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
      throw new ProtocolException(
          "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
    }
    String headerAccept = response.header("Sec-WebSocket-Accept");
    String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC);
    if (!acceptExpected.equals(headerAccept)) {
      throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
          + acceptExpected
          + "' but was '"
          + headerAccept
          + "'");
    }

    String name = response.request().url().redact().toString();
    ThreadPoolExecutor replyExecutor =
        new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(),
            Util.threadFactory(Util.format("OkHttp %s WebSocket Replier", name), true));
    replyExecutor.allowCoreThreadTimeOut(true);

    StreamAllocation streamAllocation = call.streamAllocation();
    streamAllocation.noNewStreams(); // Web socket connections can't be re-used.
    return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, response, name);
  }

如果伺服器端返回的http code不是101,則表示升級協議失敗,扔出異常,然後會檢測響應頭中是否包含Connection,且對應的值是否是Upgrade,再判斷響應頭中是否包含Upgrade,且其值為websocket,如果不滿足條件,扔出異常,然後獲取響應頭中的Sec-WebSocket-Accept值,進行校驗,是否和預期的值是一樣。其計算方式就是建構函式中生成的隨機數的base64的值加上WebSocket的魔數258EAFA5-E914-47DA-95CA-C5AB0DC85B11,進行sha1後的base64值。然後構造StreamWebSocket物件返回。

返回後呼叫 webSocket.loopReader();方法進行迴圈。該方法首先會呼叫回撥介面中的onOpen方法告訴呼叫者建立連線成功了,然後不斷讀取訊息幀。讀取訊息幀的流程就是解析文章中最開始貼的圖中的協議內容。

 public final void loopReader() {
    looperThread = Thread.currentThread();

    try {
      try {
        readerListener.onOpen(this, response);
      } catch (Throwable t) {
        Util.throwIfFatal(t);
        replyToReaderError(t);
        readerListener.onFailure(t, null);
        return;
      }

      while (processNextFrame()) {
      }
    } finally {
      looperThread = null;
    }
  }

如讀取到控制幀時會根據不同的opcode回撥介面中的對應函式

 switch (opcode) {
      case OPCODE_CONTROL_PING:
        frameCallback.onReadPing(buffer.readByteString());
        break;
      case OPCODE_CONTROL_PONG:
        frameCallback.onReadPong(buffer.readByteString());
        break;
      case OPCODE_CONTROL_CLOSE:
        int code = CLOSE_NO_STATUS_CODE;
        String reason = "";
        long bufferSize = buffer.size();
        if (bufferSize == 1) {
          throw new ProtocolException("Malformed close payload length of 1.");
        } else if (bufferSize != 0) {
          code = buffer.readShort();
          reason = buffer.readUtf8();
          validateCloseCode(code, false);
        }
        frameCallback.onReadClose(code, reason);
        closed = true;
        break;
      default:
        throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
    }

當讀到ping幀時,會將原資料以pong幀返回

 @Override public final void onReadPing(ByteString buffer) {
    replyToPeerPing(buffer);
  }

 /** Replies with a pong when a ping frame is read from the peer. */
  private void replyToPeerPing(final ByteString payload) {
    Runnable replierPong = new NamedRunnable("OkHttp %s WebSocket Pong Reply", name) {
      @Override protected void execute() {
        try {
          writer.writePong(payload);
        } catch (IOException t) {
          Platform.get().log(INFO, "Unable to send pong reply in response to peer ping.", t);
        }
      }
    };
    synchronized (replier) {
      if (!isShutdown) {
        replier.execute(replierPong);
      }
    }
  }

當讀到pong幀時,直接回調

public final void onReadPong(ByteString buffer) {
    readerListener.onPong(buffer);
  }

當讀到close幀時,也是直接回調

  @Override public final void onReadClose(int code, String reason) {
    replyToPeerClose(code, reason);
    readerSawClose = true;
    readerListener.onClose(code, reason);
  }

再者讀到訊息幀的時候,就會讀取payload data中的資料,回撥frameCallback.onReadMessage方法,返回資料。

private void readMessageFrame() throws IOException {
    final MediaType type;
    switch (opcode) {
      case OPCODE_TEXT:
        type = WebSocket.TEXT;
        break;
      case OPCODE_BINARY:
        type = WebSocket.BINARY;
        break;
      default:
        throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
    }

    final BufferedSource source = Okio.buffer(framedMessageSource);
    ResponseBody body = new ResponseBody() {
      @Override public MediaType contentType() {
        return type;
      }

      @Override public long contentLength() {
        return -1;
      }

      @Override public BufferedSource source() {
        return source;
      }
    };

    messageClosed = false;
    frameCallback.onReadMessage(body);
    if (!messageClosed) {
      throw new IllegalStateException("Listener failed to call close on message payload.");
    }
  }

frameCallback.onReadMessage會回撥到RealWebSocket中的onReadMessage,最終回撥給監聽器


  @Override public final void onReadMessage(ResponseBody message) throws IOException {
    readerListener.onMessage(message);
  }

同理,回覆訊息幀則是讀取訊息幀的逆過程,具體流程,有興趣自己看原始碼把~