websocket的那些事 - socket.io-client-java原始碼淺析
阿新 • • 發佈:2018-12-10
此原始碼分析基於 socket.io-client-java 0.9.0
先來個 socket.io-client-java 客戶端例項:
import java.net.URISyntaxException; import java.util.Scanner; import com.itdreamer.util.DataUtil; import io.socket.client.IO; import io.socket.client.Socket; import io.socket.emitter.Emitter; public class Client { public static void main(String[] args) throws URISyntaxException, InterruptedException { IO.Options options = new IO.Options(); options.transports = new String[]{"websocket"}; options.reconnectionAttempts = 2; //失敗重連的時間間隔 options.reconnectionDelay = 1000; //連線超時時間(ms) options.timeout = 500; final Socket socket = IO.socket("http://localhost:8090", options); socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... args) { System.out.println(DataUtil.getCurrentDataTime() + ":client connect! "); socket.send("hello server, my name is client"); } }); socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { @Override public void call(Object... args) { System.out.println(DataUtil.getCurrentDataTime() + ":client disconnect!"); } }); socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override public void call(Object... args) { for (Object obj : args) { System.out.println(DataUtil.getCurrentDataTime() + ":receive server message="+obj); } } }); socket.connect(); System.out.println(DataUtil.getCurrentDataTime() + ":client console input......"); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String message = sc.next(); System.out.println(DataUtil.getCurrentDataTime() + ":client console send data="+message); socket.send(message); } } }
socket.io-client-java 封裝得很不錯,只要簡單幾個方法,把自己的業務邏輯實現就搞定了。
本方從以下兩個方面進行分析:
1、socket 連線是如何建立的
2、服務端響應的資訊是如何接收並呼叫定義事件對應Listener的
socket 連線是如何建立的
連線門面物件
io.socket.client.Socket
入口方法
socket.connect();
/** * Connects the socket. */ public Socket connect() { return this.open(); } /** * Connects the socket. */ public Socket open() { EventThread.exec(new Runnable() { @Override public void run() { if (Socket.this.connected) return; Socket.this.subEvents(); Socket.this.io.open(); // ensure open if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); Socket.this.emit(EVENT_CONNECTING); } }); return this; }
設定底層共用事件監聽器
io.socket.client.Manager
public Manager open(){ return open(null); } /** * Connects the client. * * @param fn callback. * @return a reference to this object. */ public Manager open(final OpenCallback fn) { EventThread.exec(new Runnable() { @Override public void run() { logger.fine(String.format("readyState %s", Manager.this.readyState)); if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return; logger.fine(String.format("opening %s", Manager.this.uri)); Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts); final io.socket.engineio.client.Socket socket = Manager.this.engine; final Manager self = Manager.this; Manager.this.readyState = ReadyState.OPENING; Manager.this.skipReconnect = false; // 新增一系列共用事件監聽器 // propagate transport event. socket.on(Engine.EVENT_TRANSPORT, new Listener() { @Override public void call(Object... args) { self.emit(Manager.EVENT_TRANSPORT, args); } }); final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() { @Override public void call(Object... objects) { self.onopen(); if (fn != null) fn.call(null); } }); On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() { @Override public void call(Object... objects) { Object data = objects.length > 0 ? objects[0] : null; logger.fine("connect_error"); self.cleanup(); self.readyState = ReadyState.CLOSED; self.emitAll(EVENT_CONNECT_ERROR, data); if (fn != null) { Exception err = new SocketIOException("Connection error", data instanceof Exception ? (Exception) data : null); fn.call(err); } else { // Only do this if there is no fn to handle the error self.maybeReconnectOnOpen(); } } }); if (Manager.this._timeout >= 0) { final long timeout = Manager.this._timeout; logger.fine(String.format("connection attempt will timeout after %d", timeout)); final Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { EventThread.exec(new Runnable() { @Override public void run() { logger.fine(String.format("connect attempt timed out after %d", timeout)); openSub.destroy(); socket.close(); socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout")); self.emitAll(EVENT_CONNECT_TIMEOUT, timeout); } }); } }, timeout); Manager.this.subs.add(new On.Handle() { @Override public void destroy() { timer.cancel(); } }); } Manager.this.subs.add(openSub); Manager.this.subs.add(errorSub); //建立連線 Manager.this.engine.open(); } }); return this; }
根據transportName建立 Transport 傳輸物件
io.socket.engineio.client.Socket
/**
* Connects the client.
*
* @return a reference to to this object.
*/
public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
String transportName;
if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) {
transportName = WebSocket.NAME;
} else if (0 == Socket.this.transports.size()) {
// Emit error on next tick so it can be listened to
final Socket self = Socket.this;
EventThread.nextTick(new Runnable() {
@Override
public void run() {
self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available"));
}
});
return;
} else {
transportName = Socket.this.transports.get(0);
}
Socket.this.readyState = ReadyState.OPENING;
// 根據transportName建立具體Transport物件,這裡是WebSocket
Transport transport = Socket.this.createTransport(transportName);
Socket.this.setTransport(transport);
//開啟連線
transport.open();
}
});
return this;
}
private Transport createTransport(String name) {
logger.fine(String.format("creating transport '%s'", name));
Map<String, String> query = new HashMap<String, String>(this.query);
query.put("EIO", String.valueOf(Parser.PROTOCOL));
query.put("transport", name);
if (this.id != null) {
query.put("sid", this.id);
}
Transport.Options opts = new Transport.Options();
opts.hostname = this.hostname;
opts.port = this.port;
opts.secure = this.secure;
opts.path = this.path;
opts.query = query;
opts.timestampRequests = this.timestampRequests;
opts.timestampParam = this.timestampParam;
opts.policyPort = this.policyPort;
opts.socket = this;
opts.callFactory = this.callFactory;
opts.webSocketFactory = this.webSocketFactory;
// 這裡 look ...........
Transport transport;
if (WebSocket.NAME.equals(name)) {
transport = new WebSocket(opts);
} else if (Polling.NAME.equals(name)) {
transport = new PollingXHR(opts);
} else {
throw new RuntimeException();
}
this.emit(EVENT_TRANSPORT, transport);
return transport;
}
呼叫Transport實現類WebSocket物件建立連線
io.socket.engineio.client.Transport
public Transport open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Transport.this.readyState == ReadyState.CLOSED || Transport.this.readyState == null) {
Transport.this.readyState = ReadyState.OPENING;
Transport.this.doOpen();
}
}
});
return this;
}
呼叫okhttp3進行連線建立
io.socket.engineio.client.transports.WebSocket
protected void doOpen() {
Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
this.emit(EVENT_REQUEST_HEADERS, headers);
final WebSocket self = this;
okhttp3.WebSocket.Factory factory = webSocketFactory != null ? webSocketFactory : new OkHttpClient();
// 請求連結設定 uri()
Request.Builder builder = new Request.Builder().url(uri());
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
for (String v : entry.getValue()) {
builder.addHeader(entry.getKey(), v);
}
}
final Request request = builder.build();
ws = factory.newWebSocket(request, new WebSocketListener() {
// 開啟連線
@Override
public void onOpen(okhttp3.WebSocket webSocket, Response response) {
final Map<String, List<String>> headers = response.headers().toMultimap();
EventThread.exec(new Runnable() {
@Override
public void run() {
self.emit(EVENT_RESPONSE_HEADERS, headers);
self.onOpen();
}
});
}
// 接收字元訊息
@Override
public void onMessage(okhttp3.WebSocket webSocket, final String text) {
if (text == null) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onData(text);
}
});
}
// 接收byte訊息
@Override
public void onMessage(okhttp3.WebSocket webSocket, final ByteString bytes) {
if (bytes == null) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onData(bytes.toByteArray());
}
});
}
// 關閉連線
@Override
public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onClose();
}
});
}
// 異常處理
@Override
public void onFailure(okhttp3.WebSocket webSocket, final Throwable t, Response response) {
if (!(t instanceof Exception)) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onError("websocket error", (Exception) t);
}
});
}
});
}
// 組合請求連結
protected String uri() {
Map<String, String> query = this.query;
if (query == null) {
query = new HashMap<String, String>();
}
String schema = this.secure ? "wss" : "ws";
String port = "";
if (this.port > 0 && (("wss".equals(schema) && this.port != 443)
|| ("ws".equals(schema) && this.port != 80))) {
port = ":" + this.port;
}
if (this.timestampRequests) {
query.put(this.timestampParam, Yeast.yeast());
}
String derivedQuery = ParseQS.encode(query);
if (derivedQuery.length() > 0) {
derivedQuery = "?" + derivedQuery;
}
boolean ipv6 = this.hostname.contains(":");
return schema + "://" + (ipv6 ? "[" + this.hostname + "]" : this.hostname) + port + this.path + derivedQuery;
}
okhttp3連線建立[實際建立連線處]
請看 --》okhttp websocket原始碼篇
至此連線建立流程結束
服務端響應資訊是如何接收並呼叫定義事件對應Listener
從上面連線的建立 okhttp3、WebSocket 部分可以看出,okhttp3在連線建立後會執行一個輪詢讀取websocket訊息的方法
/** 開啟輪詢讀取websocket的訊息 */ loopReader();
而實際收到訊息後,會呼叫 WebSocket 監控器類定義的 onMessage 方法執行響應訊息回撥,如果是字元資訊會是json字串
以下物件完成響應訊息業務處理方法執行
io.socket.emitter.Emitter
protected void onData(String data) {
this.onPacket(Parser.decodePacket(data));
}
protected void onPacket(Packet packet) {
this.emit(EVENT_PACKET, packet);
}
/**
* 實際回撥響應訊息業務處理方法
*
* Executes each of listeners with the given args.
*
* @param event an event name.
* @param args
* @return a reference to this object.
*/
public Emitter emit(String event, Object... args) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) {
for (Listener fn : callbacks) {
fn.call(args);
}
}
return this;
}
總結
從整體來看,socketio-client-java是對okttp3進行封裝,隱藏掉底層實現,讓我們只需呼叫簡單API就能完成業務實現