1. 程式人生 > >websocket的那些事 - socket.io-client-java原始碼淺析

websocket的那些事 - socket.io-client-java原始碼淺析

此原始碼分析基於 socket.io-client-java  0.9.0

先來個 socket.io-client-java 客戶端例項:

import java.net.URISyntaxException;
import java.util.Scanner;

import com.itdreamer.util.DataUtil;

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

public class Client {
	
	 public static void main(String[] args) throws URISyntaxException, InterruptedException {
	        IO.Options options = new IO.Options();
	        options.transports = new String[]{"websocket"};
	        options.reconnectionAttempts = 2;
	        //失敗重連的時間間隔
	        options.reconnectionDelay = 1000;
	        //連線超時時間(ms)
	        options.timeout = 500;			
	 
	        final Socket socket = IO.socket("http://localhost:8090", options);
	 
	        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
	            @Override
	            public void call(Object... args) {
	            	System.out.println(DataUtil.getCurrentDataTime() + ":client connect! ");
	            	
	                socket.send("hello server, my name is client");
	            }
	        });
	 
	        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
	            @Override
	            public void call(Object... args) {
	                System.out.println(DataUtil.getCurrentDataTime() + ":client disconnect!");
	            }
	        });
	 
	        socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
	            @Override
	            public void call(Object... args) {
	                for (Object obj : args) {
	                    System.out.println(DataUtil.getCurrentDataTime() + ":receive server message="+obj);
	                }
	            }
	        });
	        socket.connect();
	        
	        System.out.println(DataUtil.getCurrentDataTime() + ":client console input......");
			Scanner sc = new Scanner(System.in);
	        while (sc.hasNext()) {
	            String message = sc.next();
	            System.out.println(DataUtil.getCurrentDataTime() + ":client console send data="+message);
	            
				socket.send(message);
	        }
	    }
}

socket.io-client-java 封裝得很不錯,只要簡單幾個方法,把自己的業務邏輯實現就搞定了。

本方從以下兩個方面進行分析:

1、socket 連線是如何建立的

2、服務端響應的資訊是如何接收並呼叫定義事件對應Listener的

socket 連線是如何建立的

連線門面物件

io.socket.client.Socket

入口方法

socket.connect();

 

    /**
     * Connects the socket.
     */
    public Socket connect() {
        return this.open();
    }

    /**
     * Connects the socket.
     */
    public Socket open() {
        EventThread.exec(new Runnable() {
            @Override
            public void run() {
                if (Socket.this.connected) return;

                Socket.this.subEvents();
                Socket.this.io.open(); // ensure open
                if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
                Socket.this.emit(EVENT_CONNECTING);
            }
        });
        return this;
    }

設定底層共用事件監聽器

io.socket.client.Manager

    public Manager open(){
        return open(null);
    }

/**
     * Connects the client.
     *
     * @param fn callback.
     * @return a reference to this object.
     */
    public Manager open(final OpenCallback fn) {
        EventThread.exec(new Runnable() {
            @Override
            public void run() {
                logger.fine(String.format("readyState %s", Manager.this.readyState));
                if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return;

                logger.fine(String.format("opening %s", Manager.this.uri));
                Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
                final io.socket.engineio.client.Socket socket = Manager.this.engine;
                final Manager self = Manager.this;
                Manager.this.readyState = ReadyState.OPENING;
                Manager.this.skipReconnect = false;
                
                // 新增一系列共用事件監聽器
                // propagate transport event.
                socket.on(Engine.EVENT_TRANSPORT, new Listener() {
                    @Override
                    public void call(Object... args) {
                        self.emit(Manager.EVENT_TRANSPORT, args);
                    }
                });

                final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
                    @Override
                    public void call(Object... objects) {
                        self.onopen();
                        if (fn != null) fn.call(null);
                    }
                });

                On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
                    @Override
                    public void call(Object... objects) {
                        Object data = objects.length > 0 ? objects[0] : null;
                        logger.fine("connect_error");
                        self.cleanup();
                        self.readyState = ReadyState.CLOSED;
                        self.emitAll(EVENT_CONNECT_ERROR, data);
                        if (fn != null) {
                            Exception err = new SocketIOException("Connection error",
                                    data instanceof Exception ? (Exception) data : null);
                            fn.call(err);
                        } else {
                            // Only do this if there is no fn to handle the error
                            self.maybeReconnectOnOpen();
                        }
                    }
                });

                if (Manager.this._timeout >= 0) {
                    final long timeout = Manager.this._timeout;
                    logger.fine(String.format("connection attempt will timeout after %d", timeout));

                    final Timer timer = new Timer();
                    timer.schedule(new TimerTask() {
                        @Override
                        public void run() {
                            EventThread.exec(new Runnable() {
                                @Override
                                public void run() {
                                    logger.fine(String.format("connect attempt timed out after %d", timeout));
                                    openSub.destroy();
                                    socket.close();
                                    socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
                                    self.emitAll(EVENT_CONNECT_TIMEOUT, timeout);
                                }
                            });
                        }
                    }, timeout);

                    Manager.this.subs.add(new On.Handle() {
                        @Override
                        public void destroy() {
                            timer.cancel();
                        }
                    });
                }

                Manager.this.subs.add(openSub);
                Manager.this.subs.add(errorSub);

                //建立連線
                Manager.this.engine.open();
            }
        });
        return this;
    }

根據transportName建立 Transport 傳輸物件

io.socket.engineio.client.Socket

/**
     * Connects the client.
     *
     * @return a reference to to this object.
     */
    public Socket open() {
        EventThread.exec(new Runnable() {
            @Override
            public void run() {
                String transportName;
                if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) {
                    transportName = WebSocket.NAME;
                } else if (0 == Socket.this.transports.size()) {
                    // Emit error on next tick so it can be listened to
                    final Socket self = Socket.this;
                    EventThread.nextTick(new Runnable() {
                        @Override
                        public void run() {
                            self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available"));
                        }
                    });
                    return;
                } else {
                    transportName = Socket.this.transports.get(0);
                }
                Socket.this.readyState = ReadyState.OPENING;
                // 根據transportName建立具體Transport物件,這裡是WebSocket
                Transport transport = Socket.this.createTransport(transportName);
                Socket.this.setTransport(transport);

                //開啟連線
                transport.open();
            }
        });
        return this;
    }

    private Transport createTransport(String name) {
        logger.fine(String.format("creating transport '%s'", name));
        Map<String, String> query = new HashMap<String, String>(this.query);

        query.put("EIO", String.valueOf(Parser.PROTOCOL));
        query.put("transport", name);
        if (this.id != null) {
            query.put("sid", this.id);
        }

        Transport.Options opts = new Transport.Options();
        opts.hostname = this.hostname;
        opts.port = this.port;
        opts.secure = this.secure;
        opts.path = this.path;
        opts.query = query;
        opts.timestampRequests = this.timestampRequests;
        opts.timestampParam = this.timestampParam;
        opts.policyPort = this.policyPort;
        opts.socket = this;
        opts.callFactory = this.callFactory;
        opts.webSocketFactory = this.webSocketFactory;

        // 這裡 look ...........
        Transport transport;
        if (WebSocket.NAME.equals(name)) {
            transport = new WebSocket(opts);
        } else if (Polling.NAME.equals(name)) {
            transport = new PollingXHR(opts);
        } else {
            throw new RuntimeException();
        }

        this.emit(EVENT_TRANSPORT, transport);

        return transport;
    }

呼叫Transport實現類WebSocket物件建立連線

io.socket.engineio.client.Transport

    public Transport open() {
        EventThread.exec(new Runnable() {
            @Override
            public void run() {
                if (Transport.this.readyState == ReadyState.CLOSED || Transport.this.readyState == null) {
                    Transport.this.readyState = ReadyState.OPENING;
                    Transport.this.doOpen();
                }
            }
        });
        return this;
    }

呼叫okhttp3進行連線建立

io.socket.engineio.client.transports.WebSocket

protected void doOpen() {
        Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
        this.emit(EVENT_REQUEST_HEADERS, headers);

        final WebSocket self = this;
        okhttp3.WebSocket.Factory factory = webSocketFactory != null ? webSocketFactory : new OkHttpClient();
        // 請求連結設定 uri()
        Request.Builder builder = new Request.Builder().url(uri());
        for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
            for (String v : entry.getValue()) {
                builder.addHeader(entry.getKey(), v);
            }
        }
        final Request request = builder.build();
        ws = factory.newWebSocket(request, new WebSocketListener() {
            // 開啟連線
            @Override
            public void onOpen(okhttp3.WebSocket webSocket, Response response) {
                final Map<String, List<String>> headers = response.headers().toMultimap();
                EventThread.exec(new Runnable() {
                    @Override
                    public void run() {
                        self.emit(EVENT_RESPONSE_HEADERS, headers);
                        self.onOpen();
                    }
                });
            }
            // 接收字元訊息
            @Override
            public void onMessage(okhttp3.WebSocket webSocket, final String text) {
                if (text == null) {
                    return;
                }
                EventThread.exec(new Runnable() {
                    @Override
                    public void run() {
                    self.onData(text);
                    }
                });
            }
            // 接收byte訊息
            @Override
            public void onMessage(okhttp3.WebSocket webSocket, final ByteString bytes) {
                if (bytes == null) {
                    return;
                }
                EventThread.exec(new Runnable() {
                    @Override
                    public void run() {
                        self.onData(bytes.toByteArray());
                    }
                });
            }
            // 關閉連線
            @Override
            public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {
                EventThread.exec(new Runnable() {
                    @Override
                    public void run() {
                        self.onClose();
                    }
                });
            }
            // 異常處理
            @Override
            public void onFailure(okhttp3.WebSocket webSocket, final Throwable t, Response response) {
                if (!(t instanceof Exception)) {
                    return;
                }
                EventThread.exec(new Runnable() {
                    @Override
                    public void run() {
                        self.onError("websocket error", (Exception) t);
                    }
                });
            }
        });
    }

    // 組合請求連結
    protected String uri() {
        Map<String, String> query = this.query;
        if (query == null) {
            query = new HashMap<String, String>();
        }
        String schema = this.secure ? "wss" : "ws";
        String port = "";

        if (this.port > 0 && (("wss".equals(schema) && this.port != 443)
                || ("ws".equals(schema) && this.port != 80))) {
            port = ":" + this.port;
        }

        if (this.timestampRequests) {
            query.put(this.timestampParam, Yeast.yeast());
        }

        String derivedQuery = ParseQS.encode(query);
        if (derivedQuery.length() > 0) {
            derivedQuery = "?" + derivedQuery;
        }

        boolean ipv6 = this.hostname.contains(":");
        return schema + "://" + (ipv6 ? "[" + this.hostname + "]" : this.hostname) + port + this.path + derivedQuery;
    }

okhttp3連線建立[實際建立連線處]

請看 --》okhttp websocket原始碼篇

至此連線建立流程結束

 

服務端響應資訊是如何接收並呼叫定義事件對應Listener

        從上面連線的建立 okhttp3WebSocket 部分可以看出,okhttp3在連線建立後會執行一個輪詢讀取websocket訊息的方法

/** 開啟輪詢讀取websocket的訊息 */
          loopReader();

而實際收到訊息後,會呼叫 WebSocket 監控器類定義的 onMessage 方法執行響應訊息回撥,如果是字元資訊會是json字串

以下物件完成響應訊息業務處理方法執行

io.socket.emitter.Emitter

    protected void onData(String data) {
        this.onPacket(Parser.decodePacket(data));
    }

    protected void onPacket(Packet packet) {
        this.emit(EVENT_PACKET, packet);
    }
    
    /**
     * 實際回撥響應訊息業務處理方法
     *
     * Executes each of listeners with the given args.
     *
     * @param event an event name.
     * @param args
     * @return a reference to this object.
     */
    public Emitter emit(String event, Object... args) {
        ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
        if (callbacks != null) {
            for (Listener fn : callbacks) {
                fn.call(args);
            }
        }
        return this;
    }

總結

       從整體來看,socketio-client-java是對okttp3進行封裝,隱藏掉底層實現,讓我們只需呼叫簡單API就能完成業務實現