WebSocket學習——結合OkHttp原始碼分析
轉自:https://www.jianshu.com/p/ba0f45aa7457
前言
最近公司有專案需要用WebSocket完成及時通訊的需求,這裡來學習一下。
WebScoket簡介
在以前的web應用中,雙向通訊機制往往藉助輪詢或是長輪詢來實現,但是這兩種方式都會或多或少的造成資源的浪費,且是非實時的。還有http長連線,但是本質上還是Request與Response,只是減少握手連線次數,雖然減少了部分開銷,但仍然會造成資源的浪費、實時性不強等問題。
WebSocket作為一種解決web應用雙向通訊的協議由HTML5規範引出(RFC6455傳送門),是一種建立在TCP協議基礎上的全雙工通訊的協議。
一、與傳統輪詢不同
這裡取網上流傳度很高的例子介紹
輪詢:
客戶端(發請求,建立連結):啦啦啦,有沒有新資訊(Request)
服務端:沒有(Response)
客戶端(發請求,建立連結):啦啦啦,有沒有新資訊(Request)
服務端:沒有。。(Response)
客戶端(發請求,建立連結):啦啦啦,有沒有新資訊(Request)
服務端:你好煩啊,沒有啊。。(Response)
客戶端(發請求,建立連結):啦啦啦,有沒有新訊息(Request)
服務端:好啦好啦,有啦給你。(Response)
客戶端(發請求,建立連結):啦啦啦,有沒有新訊息(Request)
服務端:。。。。。沒。。。。沒。。。沒有(Response)長輪詢:
客戶端(發請求,建立連結):啦啦啦,有沒有新資訊,沒有的話就等有了才返回給我吧(Request)
等等等。。。。。
服務端:額。。 等待到有訊息的時候。。來 給你(Response)
客戶端(發請求,建立連結):啦啦啦,有沒有新資訊,沒有的話就等有了才返回給我吧(Request)WebSocket:
客戶端:啦啦啦,我要建立Websocket協議,需要的服務:chat,Websocket協議版本:17(HTTP Request)
服務端:ok,確認,已升級為Websocket協議(HTTP Protocols Switched)
客戶端:麻煩你有資訊的時候推送給我噢。。
服務端:ok,有的時候會告訴你的。
服務端:balabalabalabala
客戶端:balabalabalabala
服務端:哈哈哈哈哈啊哈哈哈哈
服務端:笑死我了哈哈哈哈哈哈哈來自知乎高贊同回答WebSocket 是什麼原理?為什麼可以實現持久連線?
從上面的例子可以看出,不管是輪詢還是長輪詢,本質都是不斷地傳送HTTP請求,然後由服務端處理返回結果,並不是真正意義上的雙向通訊。而且帶來的後果是大量的資源被浪費(HTTP請求),服務端需要快速的處理請求,還要考慮併發等問題。而WebSocket解決了這些問題,通過握手操作後就建立了持久連線,之後客戶端和服務端在連線斷開之前都可以傳送訊息,實現真正的全雙工通訊。
二、與Socket、HTTP的關係
很多人剛接觸WebSocket肯定會與Socket混淆,這裡放出OSI模型
OSI Model
我們知道,Socket是對TCP/IP協議的封裝,Socket本身並不是協議,而是一個呼叫介面(API)。而WebSocket在圖中處於應用層,屬於應用層協議。所以二者僅僅是名字像而已,就像Java與JavaScript一樣。
TCP是傳輸層的協議,WebScoket和HTTP都是基於TCP協議的高層(應用層)協議,所以從本質上講,WebSocket和HTTP是處於同一層的兩種不同的協議。但是WebSocket使用了HTTP完成了握手連線,根據RFC6455文件中1.5節設計哲♂學中描述,是為了簡單和相容性考慮。具體握手操作我們會在後面提到。
所以總的來說,WebSocket與Socket由於層級不同,關係也僅僅是在某些環境中WebSocket可能通過Socket來使用TCP協議和名字比較像。和HTTP是同一層面的不同協議(最大的區別WebSocket是持久化協議而HTTP不是)。
WebScoket協議
這裡主要提一下協議中比較重要的握手和傳送資料
一、握手
之前有說到,WebSocket的握手是用HTTP請求來完成的,這裡我們來看一下RFC6455文件中一個客戶端握手的栗子
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
可以發現,這和一個一般的HTTP請求頭沒啥區別,需要注意的是(這裡講重點,具體還請看協議文件):
- 根據協議規範,握手必須是一個HTTP請求,請求的方法必須是GET,HTTP版本不可以低於1.1。
- 請求頭必須包含Upgrade屬性名,其值必須包含"websocket"。
- 請求頭必須包含Connection屬性名,其值必須包含"Upgrade"。
- 請求頭必須包含Sec-WebSocket-Key屬性名,其值是16位元組的隨機數的被base64編碼後的值
- 如果請求來自瀏覽器必須包含Origin屬性名
- 請求頭必須包含Sec-WebSocket-Version屬性名,其值必須是13
如果請求不符合規範,服務端會返回400 bad request。如果服務端選擇接受連線,則會返回比如:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
首先不同於普通的HTTP請求這裡返回101,然後Upgrade和Connection同上都是規定好的,Sec-WebSocket-Accept是由請求頭的Sec-WebSocket-Key加上字串258EAFA5-E914-47DA-95CA-C5AB0DC85B11之後再進行SHA1加密和BASE64編碼得到的值。返回的狀態碼為101,表示同意客戶端協議轉換請求,並將它轉換為websocket協議。
在握手成功之後,WebSocket連線建立,雙向通訊便可以開始了。
二、傳送資料
在WebSocket協議,資料使用幀來傳輸。一個基本的協議幀如下
基本協議幀
細節不描述了,好多地方沒看懂。。。這裡說下重點
三、資料幀型別
- 0x0 表示一個繼續幀
- 0x1 表示一個文字幀
- 0x2 表示一個二進位制幀
- 0x3-7 為以後的非控制幀
- 0x8 表示一個連線關閉幀
- 0x9 表示一個ping
- 0xA 表示一個pong
- 0xB-F 為以後的控制幀
大部分都十分明瞭,這裡來說說Ping,Pong幀:WebSocket用Ping,Pong幀來維持心跳,當接收到Ping幀,終端必須傳送一個Pong幀響應,除非它已經接收到一個關閉幀,它應該儘快返回Pong幀作為響應。Pong幀必須包含與被響應Ping幀的應用程式資料完全相同的資料。一個Pong幀可能被主動傳送,但一般不必須返回響應,也可以做特殊處理。
結合原始碼分析
首先WebSocket雖然是H5提出的,但不僅僅應用於Web應用上。在Android客戶端,一般用下面兩種庫完成WebSocket:
- OkHttp 16年OkHttp就加入了WebSocket支援包,最新版本已經將ws融合進來,直接可以使用
- Java-WebSocket Java實現的WebSocket協議
由於OkHttp用的多,這裡毫不猶豫的使用了OkHttp,下面我們看看基本用法API
官方測試地址
String url = "ws://echo.websocket.org";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url(url)
.build();
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
mWebSocket = webSocket;
super.onOpen(webSocket, response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
t.printStackTrace();
super.onFailure(webSocket, t, response);
}
});
button.setOnClickListener((view) -> {
msg = "Hello!";
mWebSocket.send(msg);
});
用法非常簡單,從API可以看出雙向通訊與HTTP的不同,接下來我們更深入一些,主要看一下WebSocket的握手和資料收發
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
webSocket.connect(this);
return webSocket;
}
public RealWebSocket(Request request, WebSocketListener listener, Random random) {
if (!"GET".equals(request.method())) {
throw new IllegalArgumentException("Request must be GET: " + request.method());
}
this.originalRequest = request;
this.listener = listener;
this.random = random;
byte[] nonce = new byte[16];
random.nextBytes(nonce);
this.key = ByteString.of(nonce).base64();
this.writerRunnable = new Runnable() {
@Override public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
在WebSocket實現類RealWebSocket的構造方法中進行了初始化的操作,包括之前提到的握手請求頭部一個經Base64的隨機數,writerRunnable的作用是資料傳送。
然後呼叫connect方法開始建立連線
public void connect(OkHttpClient client) {
client = client.newBuilder()
.protocols(ONLY_HTTP1)
.build();
final int pingIntervalMillis = client.pingIntervalMillis();
final Request request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams(); // Prevent connection pooling!
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
// Process all web socket messages.
try {
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
initReaderAndWriter(name, pingIntervalMillis, streams);
streamAllocation.connection().socket().setSoTimeout(0);
loopReader();
} catch (Exception e) {
failWebSocket(e, null);
}
}
@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null);
}
});
}
這段程式碼涉及很多,我們來逐條看。
第一步發了一個符合WebSocket協議握手規範的HTTP請求,我們可以看到1.1協議版本和headers都和之前提到的一樣,然後看看checkResponse方法
void checkResponse(Response response) throws ProtocolException {
if (response.code() != 101) {
throw new ProtocolException("Expected HTTP 101 response but was '"
+ response.code() + " " + response.message() + "'");
}
String headerConnection = response.header("Connection");
if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
+ headerConnection + "'");
}
String headerUpgrade = response.header("Upgrade");
if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
throw new ProtocolException(
"Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
}
String headerAccept = response.header("Sec-WebSocket-Accept");
String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
.sha1().base64();
if (!acceptExpected.equals(headerAccept)) {
throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
+ acceptExpected + "' but was '" + headerAccept + "'");
}
}
也是一些協議的內容,如果有不符合規範的地方就會丟擲ProtocolException。
第二步,在檢查完成後,連線就算正式建立了,接下來要為資料的通訊做一些準備。我們來看看Streams是什麼
public abstract static class Streams implements Closeable {
public final boolean client;
public final BufferedSource source;
public final BufferedSink sink;
public Streams(boolean client, BufferedSource source, BufferedSink sink) {
this.client = client;
this.source = source;
this.sink = sink;
}
}
Streams封裝了BufferedSource和BufferedSink,這兩個類是抽象的,實現類是RealBufferedSource和RealBufferedSink,具體的初始化過程在StreamAllocation中,而StreamAllocation的初始化與OkHttp攔截器有關,這裡不多贅述,總之此時RealBufferedSource和RealBufferedSink都已初始化完成,封裝到Streams中。
第三步通過註冊的listener回調了onOpen函式。
第四步初始化Writer和Reader
public void initReaderAndWriter(
String name, long pingIntervalMillis, Streams streams) throws IOException {
synchronized (this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
if (!messageAndCloseQueue.isEmpty()) {
runWriter(); // Send messages that were enqueued before we were connected.
}
}
reader = new WebSocketReader(streams.client, streams.source, this);
}
主要就是將放進Streams的BufferedSource和BufferedSink加進去,因為實際的讀寫操作還是這倆來進行。
第五步就是loopReader()開啟訊息讀取迴圈
public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader.processNextFrame();
}
}
void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
我們先看看readHeader()方法
private void readHeader() throws IOException {
if (closed) throw new IOException("closed");
// Disable the timeout to read the first byte of a new frame.
int b0;
long timeoutBefore = source.timeout().timeoutNanos();
source.timeout().clearTimeout();
try {
b0 = source.readByte() & 0xff;
} finally {
source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS);
}
opcode = b0 & B0_MASK_OPCODE;
isFinalFrame = (b0 & B0_FLAG_FIN) != 0;
isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0;
// Control frames must be final frames (cannot contain continuations).
if (isControlFrame && !isFinalFrame) {
throw new ProtocolException("Control frames must be final.");
}
boolean reservedFlag1 = (b0 & B0_FLAG_RSV1) != 0;
boolean reservedFlag2 = (b0 & B0_FLAG_RSV2) != 0;
boolean reservedFlag3 = (b0 & B0_FLAG_RSV3) != 0;
if (reservedFlag1 || reservedFlag2 || reservedFlag3) {
// Reserved flags are for extensions which we currently do not support.
throw new ProtocolException("Reserved flags are unsupported.");
}
int b1 = source.readByte() & 0xff;
isMasked = (b1 & B1_FLAG_MASK) != 0;
if (isMasked == isClient) {
// Masked payloads must be read on the server. Unmasked payloads must be read on the client.
throw new ProtocolException(isClient
? "Server-sent frames must not be masked."
: "Client-sent frames must be masked.");
}
// Get frame length, optionally reading from follow-up bytes if indicated by special values.
frameLength = b1 & B1_MASK_LENGTH;
if (frameLength == PAYLOAD_SHORT) {
frameLength = source.readShort() & 0xffffL; // Value is unsigned.
} else if (frameLength == PAYLOAD_LONG) {
frameLength = source.readLong();
if (frameLength < 0) {
throw new ProtocolException(
"Frame length 0x" + Long.toHexString(frameLength) + " > 0x7FFFFFFFFFFFFFFF");
}
}
frameBytesRead = 0;
if (isControlFrame && frameLength > PAYLOAD_BYTE_MAX) {
throw new ProtocolException("Control frame must be less than " + PAYLOAD_BYTE_MAX + "B.");
}
if (isMasked) {
// Read the masking key as bytes so that they can be used directly for unmasking.
source.readFully(maskKey);
}
}
有點多,著重看下source.readByte(),根據之前說的找到BufferSource的實現類,經過半天的呼叫鏈尋找,找到了最後在Okio類裡面建立的Soure、Sink匿名內部類的讀寫方法,這裡以讀為例
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
可以看出,最終呼叫了在最底層的socket的輸入流的read方法,這裡也是IO阻塞模型,等待接收訊息。到這裡連線的建立到訊息如何接收,已經差不多搞明白了,我們再來看下接收訊息後幀型別的判斷。
private void readControlFrame() throws IOException {
......
switch (opcode) {
case OPCODE_CONTROL_PING:
frameCallback.onReadPing(buffer.readByteString());
break;
case OPCODE_CONTROL_PONG:
frameCallback.onReadPong(buffer.readByteString());
break;
case OPCODE_CONTROL_CLOSE:
int code = CLOSE_NO_STATUS_CODE;
String reason = "";
long bufferSize = buffer.size();
if (bufferSize == 1) {
throw new ProtocolException("Malformed close payload length of 1.");
} else if (bufferSize != 0) {
code = buffer.readShort();
reason = buffer.readUtf8();
String codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code);
if (codeExceptionMessage != null) throw new ProtocolException(codeExceptionMessage);
}
frameCallback.onReadClose(code, reason);
closed = true;
break;
default:
throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
}
}
private void readMessageFrame() throws IOException {
......
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(message.readUtf8());
} else {
frameCallback.onReadMessage(message.readByteString());
}
}
這裡和我們在協議裡看到的一樣,對應Ping Pong Close Text Byte幀都會有相應的回撥(沒看到Continue幀),之後操作也遵循協議內容,篇幅有點長就不放程式碼了,比如Ping幀的回撥裡會發送一個Pong幀,傳送的邏輯在通過前面提到的writerRunnable裡,和接收類似,最終由Sink來執行。
簡單分析就到這裡了,有興趣的同學可以再進一步研究OkHttp原始碼。
作者:Misery_Dx
連結:https://www.jianshu.com/p/ba0f45aa7457
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。