1. 程式人生 > >WebSocket協議探究(二)

WebSocket協議探究(二)

一 複習和目標

1 複習

  • 協議概述:
    • WebSocket內建訊息定界並且全雙工通訊
    • WebSocket使用HTTP進行協議協商,協商成功使用TCP連線進行傳輸資料
    • WebScoket資料格式支援二進位制和文字
  • 初始握手和計算響應鍵值
  • 訊息格式
  • 關閉握手

2 目標

  • Nodejs實現WebSocket伺服器
  • Netty實現WebSocket伺服器
  • Js api實現WebSocket客戶端

二 Nodejs實現WebScoket伺服器

1 概述

  • Node.js 使用事件驅動, 非阻塞I/O 模型而得以輕量和高效,非常適合在分散式裝置上執行資料密集型的實時應用。
  • 程式碼摘抄自《HTML5 WebSocket權威指南》
    • http模組:http伺服器
    • events模組:事件發生器(事件監聽與發射)
    • crypto模組:加密模組(sha1雜湊函式與base64編碼)
    • util模組:常用工具類(繼承)

2 程式碼

  • 伺服器程式碼:websocket-example.js
var events = require("events");
var http = require("http");
var crypto = require("crypto");
var util = require("util");

// 操作碼
var opcodes = {
    TEXT: 1,
    BINARY: 2,
    CLOSE: 8,
    PING: 9,
    PONG: 10
};

var WebSocketConnection = function(req, socket, upgradeHead) {
    var self = this;

    var key = hashWebSocketKey(req.headers['sec-websocket-key']);
    
    // 建立連線
    socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
        'Upgrade: WebSocket\r\n'  +
        'Connection: Upgrade\r\n' +
        'sec-websocket-accept: ' + key + '\r\n' +
        '\r\n'
    );

    socket.on('data', function (buf) {
        self.buffer = Buffer.concat([self.buffer, buf]);
        while (self._processBuffer()) {
            /// process buffer while it contains complete frames
        }
    });

    socket.on('close', function (buf) {
        if (!self.closed) {
            // 自定義錯誤
            self.emit('close', 1006);
            self.closed = true;
        }
    });

    // initialize connection state
    this.socket = socket;
    this.buffer = new Buffer(0);
    this.closed = false;
}

// 繼承events.EventEmitter
util.inherits(WebSocketConnection, events.EventEmitter);

WebSocketConnection.prototype.send = function (obj) {
    var opcode;
    var payload;
    if (Buffer.isBuffer(obj)) {
        opcode = opcodes.BINARY;
        payload = obj;
    } else if (typeof obj == 'string') {
        opcode = opcodes.TEXT;
        payload = new Buffer(obj, 'utf8');
    } else {
        throw new Error('Cannot send object.Must be string or Buffer.');
    }
    this._doSend(opcode, payload);
}

WebSocketConnection.prototype.close = function (code, reason) {
    var opcode = opcodes.CLOSE;
    var buffer;

    if (code) {
        buffer = new Buffer(Buffer.byteLength(reason) + 2);
        buffer.writeUInt16BE(code, 0);
        buffer.write(reason, 2);
    } else {
        buffer = new Buffer(0);
    }
    this._doSend(opcode, buffer);
    this.closed = true;
}


WebSocketConnection.prototype._processBuffer = function () {
    var buf = this.buffer;

    if (buf.length < 2) return;

    var idx = 2;
    var b1 = buf.readUInt8(0); 
    var fin = b1 & 0x80; // fin
    var opcode = b1 & 0x0f; // 操作碼
    var b2 = buf.readUInt8(1);
    var mask = b2 & 0x80; // 掩碼
    var length = b2 & 0x7f; // 長度

    if (length > 125) {
        if (buf.length < 8) return;

        if (length == 126) {
            length = buf.readUInt16BE(2);
            idx += 2;
        } else if (length == 127) {
            var highBits = buf.readUInt32BE(2);
            if (highBits != 0) {
                // 1009代表訊息過大
                this.close(1009, "");
            }
            length = buf.readUInt32BE(6);
            idx += 8;
        }
    }

    if (buf.length < idx + 4 + length) {
        return;
    }

    maskBytes = buf.slice(idx, idx + 4);
    idx += 4;
    var payload = buf.slice(idx, idx + length);
    payload = unmask(maskBytes, payload);
    this._handleFrame(opcode, payload);
    this.buffer = buf.slice(idx + length); // 資料清空
    return true;
}

// 處理WebSocket幀
WebSocketConnection.prototype._handleFrame = function (opcode, buffer) {
    var payload;
    switch (opcode) {
        case opcodes.TEXT:
            payload = buffer.toString('utf8');
            // 傳送接收資料事件
            this.emit('data', opcode, payload);
            break;
        case opcodes.BINARY:
            payload = buffer;
            // 傳送接收資料事件
            this.emit('data', opcode, payload);
            break;
        case opcodes.PING:
            this._doSend(opcodes.PONG, buffer);
            break;
        case opcodes.PONG:
            // 
            break;
        case opcodes.CLOSE:
            var code, reason;
            if (buffer.length >= 2) {
                code = buffer.readUInt16BE(0);
                reason = buffer.toString('utf8', 2);
            }
            this.close(code, reason);
            // 傳送close事件
            this.emit('close', code, reason);
            break;
        default:
            // 1002代表協議錯誤
            this.close(1002, 'unknown opcode');
    }
}

WebSocketConnection.prototype._doSend = function (opcode, payload) {
    // 基於TCP傳送資料
    this.socket.write(encodeMessage(opcode, payload));
}

// 計算sec-websocket-accept值
var hashWebSocketKey = function (key) {
    var sha1 = crypto.createHash('sha1');
    sha1.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", 'ascii');
    return sha1.digest('base64');
}

// 資料掩碼解析
var unmask = function (maskBytes, data) {
    var payload = new Buffer(data.length);
    for (var i = 0; i < data.length; i++) {
        payload[i] = maskBytes[i % 4] ^ data[i];
    }
    return payload;
}

// 傳送資料封裝
var encodeMessage = function (opcode, payload) {
    var buf;
    var b1 = 0x80 | opcode;  // fin置為1
    var b2 = 0; // 沒有掩碼
    var length = payload.length;
    if (length < 126) {
        buf = new Buffer(payload.length + 2 + 0);
        b2 |= length;
        buf.writeUInt8(b1, 0);
        buf.writeUInt8(b2, 1);
        payload.copy(buf, 2);
    } else if (length < (1 << 16)) { // 
        buf = new Buffer(payload.length + 2 + 2);
        b2 |= 126;
        buf.writeUInt8(b1, 0);
        buf.writeUInt8(b2, 1);
        buf.writeUInt16BE(length, 2);
        payload.copy(buf, 4);
    } else {
        buf = new Buffer(payload + 2 + 8);
        b2 |= 127;
        buf.writeUInt8(b1, 0);
        buf.writeUInt8(b2, 1);
        buf.writeUInt32BE(0, 2); // 必需為0 
        buf.writeUInt32BE(length, 6);
        payload.copy(buf, 10);
    }
    return buf;
}

exports.listen = function (port, host, connectionHandler) {
    var srv = http.createServer(function (req, res) {});
    
    // 監聽upgrade事件並生成WebScoket連線
    srv.on('upgrade', function (req, socket, upgradeHead) {
        var ws = new WebSocketConnection(req, socket, upgradeHead);
        connectionHandler(ws);
    });
    
    srv.listen(port, host);
}
  • 使用示例:echo.js
var websocket = require('./websocket-example');

websocket.listen(9999,"localhost",function(conn){
    console.log("connenction opened");
    
    conn.on('data',function(opcode,data){
        console.log('message:',data);
        conn.send(data);
    });

    conn.on('close',function(code,reason){
        console.log("connection closed:", code , reason);
    });
});

注:不得不佩服nodejs程式碼的簡潔與易讀性,之前專案開發過nodejs的支付SDK,就發現nodejs的魅力,希望你也可以希望上它。

三 Netty實現WebSocket伺服器

1 概述

  • Netty是由JBOSS提供的一個java開源框架。
  • Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。
  • Netty自身對WebSocket協議就有支援,所以編寫起來十分簡單。

2 程式碼

  • WebSocketServer類
public class WebSocketServer {
    
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketInitalizer());
            ChannelFuture channelFuture = serverBootstrap
                .bind("localhost",9999).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • WebSocketInitalizer類
public class WebSocketInitalizer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new ChunkedWriteHandler());
        // 這個是最重要的Handler,後面稍微跟蹤一下原始碼
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new MyWebSocketHandler());
    }
}
  • MyWebSocketHandler類
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + ": " + msg.text());
        ctx.channel().writeAndFlush(msg);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("login: " + ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("logout: " + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

3 原始碼分析

(1)類的跟蹤

  • WebSocketServerProtocolHandler類
  • WebSocketServerProtocolHandshakeHandler類
  • WebSocketServerHandshakerFactory類:建立WebSocketServerHandshaker的實現類
  • WebSocketServerHandshaker13.newHandshakeResponse()方法
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) {
    FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS); // http1.1 101 Switching Protocols
    if (headers != null) {
        res.headers().add(headers);
    }

    CharSequence key = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_KEY);
    if (key == null) {
        throw new WebSocketHandshakeException("not a WebSocket request: missing key");
    }
    
    // WEBSOCKET_13_ACCEPT_GUID為"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
    String acceptSeed = key + WEBSOCKET_13_ACCEPT_GUID;
    byte[] sha1 = WebSocketUtil.sha1(acceptSeed.getBytes(CharsetUtil.US_ASCII));
    String accept = WebSocketUtil.base64(sha1);

    res.headers().add(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET);
    res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
    res.headers().add(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT, accept);

    String subprotocols = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);
    if (subprotocols != null) {
        // 伺服器挑選子協議
        String selectedSubprotocol = selectSubprotocol(subprotocols);
        if (selectedSubprotocol == null) {
            
        } else {
            res.headers().add(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, selectedSubprotocol);
        }
    }
    return res;
}
  • WebSocketFrameDecoder介面的實現類中的方法:解碼客戶端傳送過來的訊息,與nodejs實現類似,只不過更為詳細。
// 程式碼刪減了一些細節
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out){
    switch (state) {
        case READING_FIRST:
            framePayloadLength = 0;
            byte b = in.readByte();
            frameFinalFlag = (b & 0x80) != 0; // FIN
            frameRsv = (b & 0x70) >> 4; // RSV
            frameOpcode = b & 0x0F; // OPCODE
            state = State.READING_SECOND;
        case READING_SECOND:
            b = in.readByte();
            frameMasked = (b & 0x80) != 0;  // MASK
            framePayloadLen1 = b & 0x7F; // LEN
            if (frameRsv != 0 && !allowExtensions) { // SRV當不允許拓展時必須為0
                protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
                return;
            }

            if (!allowMaskMismatch && expectMaskedFrames != frameMasked) {
                protocolViolation(ctx, "received a frame that is not masked as expected");
                return;
            }

            if (frameOpcode > 7) { // 控制幀 8 9 10
                if (!frameFinalFlag) { // 控制幀FIN必需為true
                    protocolViolation(ctx, "fragmented control frame");
                    return;
                }

                if (framePayloadLen1 > 125) { // 控制幀長度不可能大於125
                    protocolViolation(ctx, "control frame with payload length > 125 octets");
                    return;
                }

                // OPCODE如果不等8/9/10直接返回錯誤
                if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
                      || frameOpcode == OPCODE_PONG)) {
                    protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode);
                    return;
                }
                
                // 關閉連線時長度錯誤
                if (frameOpcode == 8 && framePayloadLen1 == 1) {
                    protocolViolation(ctx, "received close control frame with payload len 1");
                    return;
                }
            } else { // OPCODE為資料幀 0連線 1文字 2二進位制
                if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
                      || frameOpcode == OPCODE_BINARY)) {
                    protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode);
                    return;
                }

                // check opcode vs message fragmentation state 1/2
                if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {
                    protocolViolation(ctx, "received continuation data frame outside fragmented message");
                    return;
                }

                // check opcode vs message fragmentation state 2/2
                if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
                    protocolViolation(ctx,
                                      "received non-continuation data frame while inside fragmented message");
                    return;
                }
            }

            state = State.READING_SIZE;
        case READING_SIZE:
            // Read frame payload length
            if (framePayloadLen1 == 126) {
                if (in.readableBytes() < 2) {
                    return;
                }
                framePayloadLength = in.readUnsignedShort();
                if (framePayloadLength < 126) {
                    protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
                    return;
                }
            } else if (framePayloadLen1 == 127) {
                if (in.readableBytes() < 8) {
                    return;
                }
                framePayloadLength = in.readLong();
                if (framePayloadLength < 65536) {
                    protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)");
                    return;
                }
            } else {
                framePayloadLength = framePayloadLen1;
            }
            
            // 超出範圍
            if (framePayloadLength > maxFramePayloadLength) {
                protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded.");
                return;
            }
            state = State.MASKING_KEY;
        case MASKING_KEY:
            if (frameMasked) {
                if (in.readableBytes() < 4) {
                    return;
                }
                if (maskingKey == null) {
                    maskingKey = new byte[4];
                }
                in.readBytes(maskingKey);
            }
            state = State.PAYLOAD;
        case PAYLOAD:
            if (in.readableBytes() < framePayloadLength) {
                return;
            }

            ByteBuf payloadBuffer = null;
            try {
                payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));
                state = State.READING_FIRST;
                
                if (frameMasked) {
                    unmask(payloadBuffer); // 掩碼解析
                }
                
                if (frameOpcode == OPCODE_PING) {
                    out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }
                if (frameOpcode == OPCODE_PONG) {
                    out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }
                if (frameOpcode == OPCODE_CLOSE) {
                    receivedClosingHandshake = true;
                    checkCloseFrameBody(ctx, payloadBuffer);
                    out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                }

                if (frameFinalFlag) {
                    if (frameOpcode != OPCODE_PING) {
                        fragmentedFramesCount = 0;
                    }
                } else {
                    fragmentedFramesCount++;
                }

                if (frameOpcode == OPCODE_TEXT) {
                    out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else if (frameOpcode == OPCODE_BINARY) {
                    out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else if (frameOpcode == OPCODE_CONT) {
                    out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
                                                           payloadBuffer));
                    payloadBuffer = null;
                    return;
                } else {
                    throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
                                                            + frameOpcode);
                }
            } finally {
                if (payloadBuffer != null) {
                    payloadBuffer.release();
                }
            }
        case CORRUPT: // 繼續讀資料
            if (in.isReadable()) {
                in.readByte();
            }
            return;
        default:
            throw new Error("Shouldn't reach here.");
    }
}


// 目前看不太懂明白這段程式碼,因為對ByteBuf不瞭解,有點類似Nio的Buffer
//  +-------------------+------------------+------------------+
//  | discardable bytes |  readable bytes  |  writable bytes  |
//  +-------------------+------------------+------------------+
//  |                   |                  |                  |
//  0      <=      readerIndex   <=   writerIndex    <=    capacity
//  我覺得應該是一個個byte解析太慢了,於是netty直接用int(4個byte)同時解析,最後不滿一個 
//  int才使用最原始的一個個byte解析。
private void unmask(ByteBuf frame) {
    int i = frame.readerIndex();
    int end = frame.writerIndex();

    ByteOrder order = frame.order();

    int intMask = ((maskingKey[0] & 0xFF) << 24)
        | ((maskingKey[1] & 0xFF) << 16)
        | ((maskingKey[2] & 0xFF) << 8)
        | (maskingKey[3] & 0xFF);

    if (order == ByteOrder.LITTLE_ENDIAN) {
        intMask = Integer.reverseBytes(intMask);
    }

    for (; i + 3 < end; i += 4) {
        frame.setInt(i, frame.getInt(i) ^ intMask);
    }
    for (; i < end; i++) {
        frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]);
    }
}
  • WebSocketFrameEncoder類:傳送資料時進行編碼,估計用於實現WebSocket客戶端,因為WebSocket伺服器傳送資料時,不需要用到掩碼。
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) {
     final ByteBuf data = msg.content();
     byte[] mask;

     byte opcode;
     if (msg instanceof TextWebSocketFrame) {
         opcode = OPCODE_TEXT;
     } else if (msg instanceof PingWebSocketFrame) {
         opcode = OPCODE_PING;
     } else if (msg instanceof PongWebSocketFrame) {
         opcode = OPCODE_PONG;
     } else if (msg instanceof CloseWebSocketFrame) {
         opcode = OPCODE_CLOSE;
     } else if (msg instanceof BinaryWebSocketFrame) {
         opcode = OPCODE_BINARY;
     } else if (msg instanceof ContinuationWebSocketFrame) {
         opcode = OPCODE_CONT;
     } else {
         throw new UnsupportedOperationException("Cannot encode frame of type: " + msg.getClass().getName());
     }

     int length = data.readableBytes();

     int b0 = 0;
     if (msg.isFinalFragment()) {
         b0 |= 1 << 7;
     }
     b0 |= msg.rsv() % 8 << 4;
     b0 |= opcode % 128;

     if (opcode == OPCODE_PING && length > 125) {
         throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was " + length);
     }

     boolean release = true;
     ByteBuf buf = null;
     try {
         int maskLength = maskPayload ? 4 : 0;
         if (length <= 125) {
             int size = 2 + maskLength;
             if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                 size += length;
             }
             buf = ctx.alloc().buffer(size);
             buf.writeByte(b0);
             byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
             buf.writeByte(b);
         } else if (length <= 0xFFFF) {
             int size = 4 + maskLength;
             if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                 size += length;
             }
             buf = ctx.alloc().buffer(size);
             buf.writeByte(b0);
             buf.writeByte(maskPayload ? 0xFE : 126);
             buf.writeByte(length >>> 8 & 0xFF);
             buf.writeByte(length & 0xFF);
         } else {
             int size = 10 + maskLength;
             if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) {
                 size += length;
             }
             buf = ctx.alloc().buffer(size);
             buf.writeByte(b0);
             buf.writeByte(maskPayload ? 0xFF : 127);
             buf.writeLong(length);
         }

         // Write payload
         if (maskPayload) {
             int random = (int) (Math.random() * Integer.MAX_VALUE);
             mask = ByteBuffer.allocate(4).putInt(random).array();
             buf.writeBytes(mask);

             ByteOrder srcOrder = data.order();
             ByteOrder dstOrder = buf.order();

             int counter = 0;
             int i = data.readerIndex();
             int end = data.writerIndex();

             if (srcOrder == dstOrder) {
                 int intMask = ((mask[0] & 0xFF) << 24)
                     | ((mask[1] & 0xFF) << 16)
                     | ((mask[2] & 0xFF) << 8)
                     | (mask[3] & 0xFF);

                 if (srcOrder == ByteOrder.LITTLE_ENDIAN) {
                     intMask = Integer.reverseBytes(intMask);
                 }

                 for (; i + 3 < end; i += 4) {
                     int intData = data.getInt(i);
                     buf.writeInt(intData ^ intMask);
                 }
             }
             for (; i < end; i++) {
                 byte byteData = data.getByte(i);
                 buf.writeByte(byteData ^ mask[counter++ % 4]);
             }
             out.add(buf);
         } else {
             if (buf.writableBytes() >= data.readableBytes()) {
                 buf.writeBytes(data);
                 out.add(buf);
             } else {
                 out.add(buf);
                 out.add(data.retain());
             }
         }
         release = false;
     } finally {
         if (release && buf != null) {
             buf.release();
         }
     }
 }

五 Js實現的客戶端

  • test.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>WebSocket</title>
</head>
<body>
    <div id="output"></div>
    <script>
        function setup(){
            output = document.getElementById("output");
            ws = new WebSocket("ws://localhost:9999/echo");

            ws.onopen = function(e){
                log('Connected');
                sendMessage('hello');
            }

            ws.onclose = function(e){
                log("Disconnected:"+e.reason);
            }

            ws.onerror = function(e){
                log("Error ");
            }

            ws.onmessage = function(e){
                log("Message received:"+e.data);
                ws.close();
            }
        }    

        function sendMessage(msg){
            ws.send(msg);
            log('Message sent');
        }

        function log(s){
            var p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.textContent = s;
            output.appendChild(p);

            console.log(s);
        }

        setup();
    </script>
</body>
</html>

參考:

  • RFC 6455
  • 《HTML5 WebSocket權威指南》
  • 某視訊教程
  • Netty原始碼