netty的websocket服務端開發
前兩篇文章分析了node.js版本的socket.io的服務端與客戶端的程式碼,其實他們socket.io就是在websocket的基礎上進一步做了一層封裝,添加了一些心跳,重連線。。。
這篇文章就先來看看在netty中是如何建立websocket服務端的吧,先來回顧一下websocket建立連線以及通訊的過程:
(1)客戶端向服務端傳送http報文格式的請求,而且是GET方法的請求,不過這裡與普通的http請求有稍微不同的地方,那就是頭部connection欄位是Upgrade,然後又Upgrad欄位,值是websocket,其請求格式如下:
* GET /chat HTTP/1.1 * Host: server.example.com * Upgrade: websocket * Connection: Upgrade * Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== * Sec-WebSocket-Origin: http://example.com * Sec-WebSocket-Protocol: chat, superchat * Sec-WebSocket-Version: 13
(2)由伺服器端向客戶端返回特定格式的http報文,表示當前websocket建立,報文格式如下:
* HTTP/1.1 101 Switching Protocols
* Upgrade: websocket
* Connection: Upgrade
* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
* Sec-WebSocket-Protocol: chat
(3)當連線建立以後,那麼雙方就可以通過剛剛建立連線用的socket來發送資料了,這裡傳送的資料必須經過websocket的幀格式的編碼,具體它的資訊就去看websocket的協議標準就知道了。。。
其實知道了整個建立連線以及通訊的過程,那麼就能夠知道如何在channel的pipeline上面安排各種handler的順序了,如下:
(1)首先要安排http報文的decode,aggregator以及encode的handler,因為最開始還是採用http通訊的
(2)接收到websocket的建立連線報文之後,通過一些處理按照規定的格式將http返回傳送給客戶端,那麼這就表示websocket的連線已經建立了
(3)移除前面設定的所有的http報文處理的handler,然後加上websocket的frame的decode以及encodehandler,用於將從客戶端收到的資料轉化為封裝好的格式,以及將要傳送的資料轉化成byte.
那麼接下來來看看具體是如何在netty中建立websocket的服務端的吧,直接上主程式的程式碼:
public class Fjs {
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //這個是用於serversocketchannel的eventloop
EventLoopGroup workerGroup = new NioEventLoopGroup(); //這個是用於處理accept到的channel
try {
ServerBootstrap b = new ServerBootstrap(); //構建serverbootstrap物件
b.group(bossGroup, workerGroup); //設定時間迴圈物件,前者用來處理accept事件,後者用於處理已經建立的連線的io
b.channel(NioServerSocketChannel.class); //用它來建立新accept的連線,用於構造serversocketchannel的工廠類
b.childHandler(new ChannelInitializer<SocketChannel>(){ //為accept channel的pipeline預新增的inboundhandler
@Override //當新連線accept的時候,這個方法會呼叫
protected void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new ReadTimeoutHandler(10));
//ch.pipeline().addLast(new WriteTimeoutHandler(1));
ch.pipeline().addLast("decoder", new HttpRequestDecoder()); //用於解析http報文的handler
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); //用於將解析出來的資料封裝成http物件,httprequest什麼的
ch.pipeline().addLast("encoder", new HttpResponseEncoder()); //用於將response編碼成httpresponse報文傳送
ch.pipeline().addLast("handshake", new WebSocketServerProtocolHandler("", "", true)); //websocket的handler部分定義的,它會自己處理握手等操作
//ch.pipeline().addLast("chunkedWriter", new ChunkedWriteHandler());
//ch.pipeline().addLast(new HttpHanlder());
ch.pipeline().addLast(new WebSocketHandler());
}
});
//bind方法會建立一個serverchannel,並且會將當前的channel註冊到eventloop上面,
//會為其繫結本地埠,並對其進行初始化,為其的pipeline加一些預設的handler
ChannelFuture f = b.bind(80).sync();
f.channel().closeFuture().sync(); //相當於在這裡阻塞,直到serverchannel關閉
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String args[]) throws Exception {
new Fjs().run();
}
}
這裡可以看看在channel上面安排的handler的情況,主要是就是http部分的decode以及encode的handler,另外這裡有一個比較重要的handler,WebSocketServerProtocolHandler它對整個websocket的通訊進行了初始化,包括握手,以及以後的一些通訊控制。。。
然後再來看看我們自己定義的handler的處理吧:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
WebSocketFrame frame = (WebSocketFrame)msg;
ByteBuf buf = frame.content(); //真正的資料是放在buf裡面的
String aa = buf.toString(Charset.forName("utf-8")); //將資料按照utf-8的方式轉化為字串
System.out.println(aa);
WebSocketFrame out = new TextWebSocketFrame(aa); //建立一個websocket幀,將其傳送給客戶端
ctx.pipeline().writeAndFlush(out).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
// TODO Auto-generated method stub
ctx.pipeline().close(); //從pipeline上面關閉的時候,會關閉底層的chanel,而且會從eventloop上面取消註冊
}
});
}
其實這裡也就主要是channelRead方法,將從客戶端傳送過來的資料讀出來輸出,然後在返回一個字串就好了。。。還是比較的簡單。。
上面的兩段程式碼就基本上能夠使用netty來建立websocket規範的服務端了,可以看到整個用法還是很簡單的,這裡還有必要去看看WebSocketServerProtocolHandler做了些什麼事情。。。
//噹噹前這個handler被新增到pipeline之後會呼叫這個方法
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// 這裡相當於要在當前的handler之前新增用於處理websocket建立連線時候的握手handler
ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(websocketPath, subprotocols, allowExtensions));
}
}
首先可以看到它覆蓋了這個方法,那麼在當前這個handler加入pipeline之後會在這個pipeline之前新增另外一個handler,這個handler是用於websocket的握手的。。。
好吧,那麼我們來看看這個用於websocket建立連線的握手的handler是怎麼個用法吧:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
if (req.getMethod() != GET) { //如果不是get請求,那麼就出錯了
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
//建立愛你websocket進行連線握手的工廠類,因為不同版本的連線握手不太一樣
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols, allowExtensions);
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); //這裡會根據不同的websocket版本來安排不同的握手handler
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
//其實這裡在將用於建立連線的http報文傳送回去之後,會將前面新增的http部分的handler都移除,然後加上用於decode和encode針對websocket幀的handler
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); //這裡說白了就是進行握手,向客戶端返回用於建立連線的報文
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
} else {
ctx.fireUserEventTriggered( //用於啟用握手已經完成的事件,可以讓使用者的程式碼收到通知
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
}
}
});
WebSocketServerProtocolHandler.setHandshaker(ctx, handshaker); //儲存當前的shaker
ctx.pipeline().replace(this, "WS403Responder",
WebSocketServerProtocolHandler.forbiddenHttpRequestResponder()); //將當前這個handler替換,因為只有剛開始使用http協議進行通訊,接下來就沒有了
}
}
其實基本還是很簡單的,無非就是根據收到的http請求,獲取當前建立連線的websocket客戶端的版本資訊,然後通過版本獲取相應的用於握手的handler,然後進行相應的處理,將用於建立websocket連線的報文傳送給客戶端就可以了,不過這裡還有一些細節,那就是會在之後將處理http的handler移除,換成處理websocket幀的handler,因為以後的通訊就不是按照http來的了,而且會將當前這個handler也替換掉,畢竟以後就不會再用了嘛。。
那麼接下來回到WebSocketServerProtocolHandler,因為本身它是一個messagetomessagedecoder,那麼來看看它的decode方法:
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
if (frame instanceof CloseWebSocketFrame) { //如果是用於關閉
WebSocketServerHandshaker handshaker = getHandshaker(ctx);
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); //傳送用於關閉的websocket幀
return;
}
super.decode(ctx, frame, out);
}
上面獲取的shaker是在前面儲存的,畢竟不同版本的websocket對應不同的。。。再來看看它父類的方法吧:
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
if (frame instanceof PingWebSocketFrame) {
frame.content().retain();
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
return;
}
if (frame instanceof PongWebSocketFrame) {
// Pong frames need to get ignored
return;
}
out.add(frame.retain());
}
那麼到這裡,整個websocket在netty中建立服務端的流程都已經很清楚了。。。
以後如果有時間的自己寫一個socket.io的吧,因為現在看到的那些都不太好用的樣子。。。