Netty(WebSocket聊天器)
處理 HTTP請求
如果被請求的URL以/ws結尾,那麼將會把該協議升級為WebSocket;否則,伺服器將使用基本的HTTP/S。在連線已經升級完成之後,所有資料都將會使用WebSocket進行傳輸。
package netty.in.action.websocket; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedNioFile; import java.io.File; import java.io.RandomAccessFile; import java.net.URISyntaxException; import java.net.URL; import java.nio.charset.Charset; public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if (wsUri.equalsIgnoreCase(request.getUri())) { //如果請求了WebSocket協議升級,則增加引用計數(呼叫retain()方 法 ),並將它傳遞給下一個ChannelInboundHandler ctx.fireChannelRead(request.retain()); } else { if (HttpHeaders.is100ContinueExpected(request)) {//處理100 Continue請求以符合HTTP1.1規範 send100Continue(ctx); } HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); if (keepAlive) {//如果請求了keep-alive,則新增所需要的HTTP頭資訊 response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 1024); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response);//將HttpResponse寫到客戶端 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//寫LastHttpContent並沖刷至客戶端 if (!keepAlive) {//如果沒有請求keep-alive,則在寫操作完成後關閉Channel future.addListener(ChannelFutureListener.CLOSE); } } } private static void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
如果該HTTP請求指向了地址為/ws的URI,那麼HttpRequestHandler將呼叫FullHttpRequest物件上的retain()方法,並通過呼叫fireChannelRead(msg)方法將它轉發給下一個ChannelInboundHandler。之所以需要呼叫retain()方法,是因為呼叫channelRead()方法完成之後,它將呼叫FullHttpRequest物件上的release()方法以釋放它的資源。
如果客戶端傳送了HTTP1.1的HTTP頭資訊Expect: 100-continue,那麼HttpRequestHandler將會發送一個100 Continue響應。在該HTTP頭資訊被設定之後,HttpRequestHandler將會寫回一個HttpResponse給客戶端。這不是一個FullHttpResponse,因為它只是響應的第一個部分。此外,這裡也不會呼叫writeAndFlush()方法,在結束的時候才會呼叫。
如果不需要加密和壓縮,那麼可以通過將index.html的內容儲存到DefaultFileRegion中來達到最佳效率。這將會利用零拷貝特性來進行內容的傳輸。否則,使用ChunkedNioFile。
HttpRequestHandler將寫一個LastHttpContent來標記響應的結束。如果沒有請求keep-alive,那麼HttpRequestHandler將會新增一個ChannelFutureListener到最後一次寫出動作的ChannelFuture,並關閉該連線。在這裡,你將呼叫writeAndFlush()方法以沖刷所有之前寫入的訊息。這部分程式碼代表了聊天伺服器的第一個部分,它管理純粹的HTTP請求和響應。
處理 WebSocket幀
幀型別 | 描述 |
BinaryWebSocketFrame | 包含了二進位制資料 |
TextWebSocketFrame | 包含了文字資料 |
ContinuationWebSocketFrame | 包含屬於上一個BinaryWebSocketFrame或TextWebSocket-Frame的文字資料或者二進位制資料 |
ContinuationWebSocketFrame | 表示一個CLOSE請求,包含一個關閉的狀態碼和關閉的原因 |
PingWebSocketFrame | 請求傳輸一個PongWebSocketFrame |
PongWebSocketFrame | 作為一個對於PingWebSocketFrame的響應被髮送 |
TextWebSocketFrame是我們唯一真正需要處理的幀型別。為了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler來處理其他型別的幀。
package netty.in.action.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);//如果該事件表示握手成功,則從該Channelipeline中移除HttpRequestHandler,因為將不會接收到任何HTTP訊息了
group.writeAndFlush(new TextWebSocketFrame("客戶端 " + ctx.channel() + " 連線"));//通知所有已經連線的WebSocket客戶端新的客戶端已經連線上了
group.add(ctx.channel());//將新的WebSocket Channel新增到ChannelGroup中,以便它可以接收到所有的訊息
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
group.writeAndFlush(msg.retain());//增加訊息的引用計數,並將它寫到ChannelGroup中所有已經連線的客戶端
}
}
TextWebSocketFrameHandler只有一組非常少量的責任。當和新客戶端的WebSocket握手成功完成之後,它將通過把通知訊息寫到ChannelGroup中的所有Channel來通知所有已經連線的客戶端,然後它將把這個新Channel加入到該ChannelGroup中。
如果接收到了TextWebSocketFrame訊息,TextWebSocketFrameHandler將呼叫TextWebSocketFrame訊息上的retain()方法,並使用writeAndFlush()方法來將它傳輸給ChannelGroup,以便所有已經連線的WebSocketChannel都將接收到它。
和之前一樣,對於retain()方法的呼叫是必需的,因為當channelRead0()方法返回時,TextWebSocketFrame的引用計數將會被減少。由於所有的操作都是非同步的,因此,writeAndFlush()方法可能會在channelRead0()方法返回之後完成,而且它絕對不能訪問一個已經失效的引用。
初始化ChannelPipeline
package netty.in.action.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
protected void initChannel(Channel ch) throws
Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());//建立HttpServer編解碼器
pipeline.addLast(new ChunkedWriteHandler());//添加了對非同步編寫大型資料流的支援
pipeline.addLast(new HttpObjectAggregator(64 * 1024));//聚合 HTTP訊息
pipeline.addLast(new HttpRequestHandler("/zzf"));//處理Http請求
pipeline.addLast(new WebSocketServerProtocolHandler("/zzf"));//它處理websocket握手以及控制幀的處理(關閉,Ping, Pong)。
pipeline.addLast(new TextWebSocketFrameHandler(group));//處理文字訊息
}
}
ChannelHandler | 職責 |
HttpServerCodec | 將位元組解碼為HttpRequest、HttpContent和LastHttpContent。並將HttpRequest、HttpContent和LastHttpContent編碼為位元組 |
ChunkedWriteHandler | 寫入一個檔案的內容 |
HttpObjectAggregator | 將一個HttpMessage和跟隨它的多個HttpContent聚合為單個FullHttpRequest或者FullHttpResponse(取決於它是被用來處理請求還是響應)。安裝了這個之後,ChannelPipeline中的下一個ChannelHandler將只會收到完整的HTTP請求或響應 |
HttpRequestHandler | 處理FullHttpRequest(那些不傳送到/ws URI的請求) |
WebSocketServerProtocolHandler | 按照WebSocket規範的要求,處理WebSocket升級握手、PingWebSocketFrame、PongWebSocketFrame和CloseWebSocketFrame |
TextWebSocketFrameHandler | 處理TextWebSocketFrame和握手完成事件 |
Netty的WebSocketServerProtocolHandler處理了所有委託管理的WebSocket幀型別以及升級握手本身。如果握手成功,那麼所需的ChannelHandler將會被新增到ChannelPipeline中,而那些不再需要的ChannelHandler則將會被移除。
WebSocket協議升級之前的ChannelPipeline的狀態如圖:
當WebSocket協議升級完成之後,WebSocketServerProtocolHandler將會把Http -RequestDecoder替換為WebSocketFrameDecoder,把HttpResponseEncoder替換為WebSocketFrameEncoder。為了效能最大化,它將移除任何不再被WebSocket連線所需要的ChannelHandler。
引導
package netty.in.action.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.net.InetSocketAddress;
public class ChatServer {
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new ChatServerInitializer(group);
}
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
int port = Integer.parseInt("8888");
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void
run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
然後需要一個html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8888/zzf");
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連線開啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連線被關閉";
};
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data;
};
} else {
alert("你的瀏覽器不支援 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連線沒有開啟.");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="Welcome to www.waylau.com">
<input type="button" value="傳送訊息" onclick="send(this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
</form>
<br>
<br>
</body>
</html>
進行加密
package netty.in.action.websocket;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);//呼叫父類的initChannel()方法
SSLEngine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst(new SslHandler(engine));//將SslHandler新增到ChannelPipeline中
}
}
package netty.in.action.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.net.InetSocketAddress;
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new SecureChatServerInitializer(group, context);//返回之前建立的SecureChatServer-Initializer以啟用加密
}
public static void main(String[] args) throws Exception {
int port = Integer.parseInt("8888");
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void
run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
參考《Netty實戰》