1. 程式人生 > >Netty(WebSocket聊天器)

Netty(WebSocket聊天器)

處理 HTTP請求    

    如果被請求的URL以/ws結尾,那麼將會把該協議升級為WebSocket;否則,伺服器將使用基本的HTTP/S。在連線已經升級完成之後,所有資料都將會使用WebSocket進行傳輸。

package netty.in.action.websocket;

import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String wsUri;


    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            //如果請求了WebSocket協議升級,則增加引用計數(呼叫retain()方 法 ),並將它傳遞給下一個ChannelInboundHandler
            ctx.fireChannelRead(request.retain());
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {//處理100 Continue請求以符合HTTP1.1規範
                send100Continue(ctx);
            }
            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
            if (keepAlive) {//如果請求了keep-alive,則新增所需要的HTTP頭資訊

                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 1024);
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }

            ctx.write(response);//將HttpResponse寫到客戶端

            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//寫LastHttpContent並沖刷至客戶端
            if (!keepAlive) {//如果沒有請求keep-alive,則在寫操作完成後關閉Channel
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

    如果該HTTP請求指向了地址為/ws的URI,那麼HttpRequestHandler將呼叫FullHttpRequest物件上的retain()方法,並通過呼叫fireChannelRead(msg)方法將它轉發給下一個ChannelInboundHandler。之所以需要呼叫retain()方法,是因為呼叫channelRead()方法完成之後,它將呼叫FullHttpRequest物件上的release()方法以釋放它的資源。

    如果客戶端傳送了HTTP1.1的HTTP頭資訊Expect: 100-continue,那麼HttpRequestHandler將會發送一個100 Continue響應。在該HTTP頭資訊被設定之後,HttpRequestHandler將會寫回一個HttpResponse給客戶端。這不是一個FullHttpResponse,因為它只是響應的第一個部分。此外,這裡也不會呼叫writeAndFlush()方法,在結束的時候才會呼叫。

    如果不需要加密和壓縮,那麼可以通過將index.html的內容儲存到DefaultFileRegion中來達到最佳效率。這將會利用零拷貝特性來進行內容的傳輸。否則,使用ChunkedNioFile。

    HttpRequestHandler將寫一個LastHttpContent來標記響應的結束。如果沒有請求keep-alive,那麼HttpRequestHandler將會新增一個ChannelFutureListener到最後一次寫出動作的ChannelFuture,並關閉該連線。在這裡,你將呼叫writeAndFlush()方法以沖刷所有之前寫入的訊息。這部分程式碼代表了聊天伺服器的第一個部分,它管理純粹的HTTP請求和響應。

處理 WebSocket幀 

幀型別 描述
BinaryWebSocketFrame 包含了二進位制資料
TextWebSocketFrame 包含了文字資料
ContinuationWebSocketFrame 包含屬於上一個BinaryWebSocketFrame或TextWebSocket-Frame的文字資料或者二進位制資料
ContinuationWebSocketFrame 表示一個CLOSE請求,包含一個關閉的狀態碼和關閉的原因
PingWebSocketFrame 請求傳輸一個PongWebSocketFrame
PongWebSocketFrame 作為一個對於PingWebSocketFrame的響應被髮送

    TextWebSocketFrame是我們唯一真正需要處理的幀型別。為了符合WebSocket  RFC,Netty提供了WebSocketServerProtocolHandler來處理其他型別的幀。

package netty.in.action.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            ctx.pipeline().remove(HttpRequestHandler.class);//如果該事件表示握手成功,則從該Channelipeline中移除HttpRequestHandler,因為將不會接收到任何HTTP訊息了
            group.writeAndFlush(new TextWebSocketFrame("客戶端 " + ctx.channel() + " 連線"));//通知所有已經連線的WebSocket客戶端新的客戶端已經連線上了
            group.add(ctx.channel());//將新的WebSocket Channel新增到ChannelGroup中,以便它可以接收到所有的訊息
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        group.writeAndFlush(msg.retain());//增加訊息的引用計數,並將它寫到ChannelGroup中所有已經連線的客戶端
    }
}

    TextWebSocketFrameHandler只有一組非常少量的責任。當和新客戶端的WebSocket握手成功完成之後,它將通過把通知訊息寫到ChannelGroup中的所有Channel來通知所有已經連線的客戶端,然後它將把這個新Channel加入到該ChannelGroup中。

    如果接收到了TextWebSocketFrame訊息,TextWebSocketFrameHandler將呼叫TextWebSocketFrame訊息上的retain()方法,並使用writeAndFlush()方法來將它傳輸給ChannelGroup,以便所有已經連線的WebSocketChannel都將接收到它。

    和之前一樣,對於retain()方法的呼叫是必需的,因為當channelRead0()方法返回時,TextWebSocketFrame的引用計數將會被減少。由於所有的操作都是非同步的,因此,writeAndFlush()方法可能會在channelRead0()方法返回之後完成,而且它絕對不能訪問一個已經失效的引用。

初始化ChannelPipeline

package netty.in.action.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChatServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws
            Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());//建立HttpServer編解碼器
        pipeline.addLast(new ChunkedWriteHandler());//添加了對非同步編寫大型資料流的支援
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));//聚合 HTTP訊息
        pipeline.addLast(new HttpRequestHandler("/zzf"));//處理Http請求
        pipeline.addLast(new WebSocketServerProtocolHandler("/zzf"));//它處理websocket握手以及控制幀的處理(關閉,Ping, Pong)。
        pipeline.addLast(new TextWebSocketFrameHandler(group));//處理文字訊息
    }
}
ChannelHandler 職責
HttpServerCodec 將位元組解碼為HttpRequest、HttpContent和LastHttpContent。並將HttpRequest、HttpContent和LastHttpContent編碼為位元組
ChunkedWriteHandler 寫入一個檔案的內容
HttpObjectAggregator 將一個HttpMessage和跟隨它的多個HttpContent聚合為單個FullHttpRequest或者FullHttpResponse(取決於它是被用來處理請求還是響應)。安裝了這個之後,ChannelPipeline中的下一個ChannelHandler將只會收到完整的HTTP請求或響應
HttpRequestHandler 處理FullHttpRequest(那些不傳送到/ws URI的請求)
WebSocketServerProtocolHandler 按照WebSocket規範的要求,處理WebSocket升級握手、PingWebSocketFrame、PongWebSocketFrame和CloseWebSocketFrame
TextWebSocketFrameHandler 處理TextWebSocketFrame和握手完成事件

    Netty的WebSocketServerProtocolHandler處理了所有委託管理的WebSocket幀型別以及升級握手本身。如果握手成功,那麼所需的ChannelHandler將會被新增到ChannelPipeline中,而那些不再需要的ChannelHandler則將會被移除。

WebSocket協議升級之前的ChannelPipeline的狀態如圖:

    當WebSocket協議升級完成之後,WebSocketServerProtocolHandler將會把Http  -RequestDecoder替換為WebSocketFrameDecoder,把HttpResponseEncoder替換為WebSocketFrameEncoder。為了效能最大化,它將移除任何不再被WebSocket連線所需要的ChannelHandler。

引導

package netty.in.action.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;

import java.net.InetSocketAddress;

public class ChatServer {
    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new ChatServerInitializer(group);
    }

    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {

        int port = Integer.parseInt("8888");
        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void
            run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

然後需要一個html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8888/zzf");
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "連線開啟!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "連線被關閉";
            };
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data;
            };
        } else {
            alert("你的瀏覽器不支援 WebSocket!");
        }
 
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("連線沒有開啟.");
            }
        }
    </script>
    <form onsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="傳送訊息" onclick="send(this.form.message.value)">
        <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
    </form>
    <br> 
    <br> 
</body>
</html>

進行加密

package netty.in.action.websocket;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;

import javax.net.ssl.SSLEngine;

public class SecureChatServerInitializer extends ChatServerInitializer {
    private final SslContext context;

    public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
        super(group);
        this.context = context;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        super.initChannel(ch);//呼叫父類的initChannel()方法
        SSLEngine engine = context.newEngine(ch.alloc());
        engine.setUseClientMode(false);
        ch.pipeline().addFirst(new SslHandler(engine));//將SslHandler新增到ChannelPipeline中
    }
}
package netty.in.action.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;

import java.net.InetSocketAddress;

public class SecureChatServer extends ChatServer {
    private final SslContext context;

    public SecureChatServer(SslContext context) {
        this.context = context;
    }

    @Override
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new SecureChatServerInitializer(group, context);//返回之前建立的SecureChatServer-Initializer以啟用加密
    }

    public static void main(String[] args) throws Exception {

        int port = Integer.parseInt("8888");
        SelfSignedCertificate cert = new SelfSignedCertificate();
        SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
        final SecureChatServer endpoint = new SecureChatServer(context);
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void
            run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

參考《Netty實戰》