WebSocket+Netty實現聊天室
伺服器:
package com.xuan.chat.demo; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; public class ChatServer { //private final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final ChannelGroup group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final int port; public ChatServer(int port) { this.port = port; } public void run() throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer(group)); b.bind(port).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port=8088; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8088; } System.out.println("open chat"); new ChatServer(8088).run();//ִ��echo } ///////////////////////////////////////////////////// /* private final ChannelGroup group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address){ ServerBootstrap boot = new ServerBootstrap(); boot.group(workerGroup).channel(NioServerSocketChannel.class).childHandler(createInitializer(group)); ChannelFuture f = boot.bind(address).syncUninterruptibly(); channel = f.channel(); return f; } protected ChannelHandler createInitializer(ChannelGroup group2) { return new ChatServerInitializer(group2); } public void destroy(){ if(channel != null) channel.close(); group.close(); workerGroup.shutdownGracefully(); } public static void main(String[] args) { final ChatServer server = new ChatServer(); ChannelFuture f = server.start(new InetSocketAddress(2048)); System.out.println("server start................"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { server.destroy(); } }); f.channel().closeFuture().syncUninterruptibly(); }*/ }
初始化:
package com.xuan.chat.demo; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class ChatServerInitializer extends ChannelInitializer<SocketChannel> { private final ChannelGroup group; public ChatServerInitializer(ChannelGroup group) { super(); this.group = group; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64*1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(group)); } }
握手處理:
WebSocket處理:TextWebSocketFrameHandlerpackage com.xuan.chat.demo; import java.io.RandomAccessFile; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.DefaultFileRegion; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedNioFile; public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; public HttpRequestHandler(String wsUri) { super(); this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { if(wsUri.equalsIgnoreCase(msg.getUri())){ ctx.fireChannelRead(msg.retain()); }else{ if(HttpHeaders.is100ContinueExpected(msg)){ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } RandomAccessFile file = new RandomAccessFile(HttpRequestHandler.class.getResource("/").getPath()+"/index.html", "r"); HttpResponse response = new DefaultHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html;charset=UTF-8"); boolean isKeepAlive = HttpHeaders.isKeepAlive(msg); if(isKeepAlive){ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); if(ctx.pipeline().get(SslHandler.class) == null){ ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); }else{ ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if(isKeepAlive == false){ future.addListener(ChannelFutureListener.CLOSE); } file.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(System.err); } }
package com.xuan.chat.demo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
super();
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
ctx.pipeline().remove(HttpRequestHandler.class);
String msg=ctx.channel().toString();
String name=ctx.name();
System.out.println("======="+msg+"========="+name);
msg=msg.substring(msg.indexOf("/")+1, msg.length());
msg=msg.substring(0, msg.indexOf(":"));
System.out.println("======="+msg+"========="+name);
group.writeAndFlush(new TextWebSocketFrame(""+msg+"使用者進入聊天室:"));
group.add(ctx.channel());
}else{
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
System.out.println("-=-=-="+msg.retain());
group.writeAndFlush(msg.retain());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
cause.printStackTrace();
}
public static void main(String args[]) {
String msg="[id: 0x68f8dbad, /192.168.1.119:61928 => /192.168.1.119:8088]";
int idx = msg.indexOf(":")-1;
msg=msg.substring(msg.indexOf("/")+1, msg.length());
msg=msg.substring(0, msg.indexOf(":"));
System.out.println(msg);
}
}
(改進過的TextWebSocketFrameHandler)與上面的類相同
package com.xuan.chat.demo;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.concurrent.ImmediateEventExecutor;
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
super();
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
ctx.pipeline().remove(HttpRequestHandler.class);
String msg=ctx.channel().toString();
String name=ctx.name();
System.out.println("======="+msg+"========="+name);
msg=msg.substring(msg.indexOf("/")+1, msg.length());
msg=msg.substring(0, msg.indexOf(":"));
System.out.println("======="+msg+"========="+name);
group.writeAndFlush(new TextWebSocketFrame(""+msg+"使用者進入聊天室:"));
group.add(ctx.channel());
}else{
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : group) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[我]" + msg.text() ));
}
}
//group.writeAndFlush(msg.retain());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
cause.printStackTrace();
}
public static void main(String args[]) {
String msg="[id: 0x68f8dbad, /192.168.1.119:61928 => /192.168.1.119:8088]";
int idx = msg.indexOf(":")-1;
msg=msg.substring(msg.indexOf("/")+1, msg.length());
msg=msg.substring(0, msg.indexOf(":"));
System.out.println(msg);
}
}
測試跳轉類:
package com.xuan.chat.demo;
import javax.servlet.http.HttpServletRequest;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;
@Controller
public class ChatTestController {
@RequestMapping(value="/netty/chat/demo")
public ModelAndView name(HttpServletRequest request) {
ModelAndView mav = new ModelAndView();
mav.setViewName("chat/index");
return mav;
}
}
WebSoket+js處理頁面(重要):
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>">
<title>My JSP 'index.jsp' starting page</title>
<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<meta http-equiv="keywords" content="keyword1,keyword2,keyword3">
<meta http-equiv="description" content="This is my page">
<!--
<link rel="stylesheet" type="text/css" href="styles.css">
-->
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://192.168.1.119:8088/ws");
socket.onmessage = function(event) {
console.log(event);
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連線開啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連線被關閉";
};
} else {
alert("你的瀏覽器不支援!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連線沒有開啟.");
}
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"><input
type="button" value="傳送訊息"
onclick="send(this.form.message.value)">
<h3>輸出:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空">
</form>
</body>
</html>
執行效果:(啟動專案,啟動聊天室伺服器)
參考資料:
WebSocket
WebSocket 通過“Upgrade handshake(升級握手)”從標準的 HTTP 或HTTPS 協議轉為 WebSocket。因此,使用 WebSocket 的應用程式將始終以 HTTP/S 開始,然後進行升級。在什麼時候發生這種情況取決於具體的應用;它可以是在啟動時,或當一個特定的 URL 被請求時。
在我們的應用中,當 URL 請求以“/ws”結束時,我們才升級協議為WebSocket。否則,伺服器將使用基本的 HTTP/S。一旦升級連線將使用的WebSocket 傳輸所有資料。
整個伺服器邏輯如下:
1.客戶端/使用者連線到伺服器並加入聊天
2.HTTP 請求頁面或 WebSocket 升級握手
3.伺服器處理所有客戶端/使用者
4.響應 URI “/”的請求,轉到預設 html 頁面
5.如果訪問的是 URI“/ws” ,處理 WebSocket 升級握手
6.升級握手完成後 ,通過 WebSocket 傳送聊天訊息
服務端
讓我們從處理 HTTP 請求的實現開始。
處理 HTTP 請求
HttpRequestHandler.java
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketChatClient.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain()); //2
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx); //3
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
file.close();
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當出現異常就關閉連線
cause.printStackTrace();
ctx.close();
}
}
1.擴充套件 SimpleChannelInboundHandler 用於處理 FullHttpRequest資訊
2.如果請求是 WebSocket 升級,遞增引用計數器(保留)並且將它傳遞給在 ChannelPipeline 中的下個 ChannelInboundHandler
3.處理符合 HTTP 1.1的 “100 Continue” 請求
4.讀取預設的 WebsocketChatClient.html 頁面
5.判斷 keepalive 是否在請求頭裡面
6.寫 HttpResponse 到客戶端
7.寫 index.html 到客戶端,判斷 SslHandler 是否在 ChannelPipeline 來決定是使用 DefaultFileRegion 還是 ChunkedNioFile
8.寫並重新整理 LastHttpContent 到客戶端,標記響應完成
9.如果 keepalive 沒有要求,當寫完成時,關閉 Channel
HttpRequestHandler 做了下面幾件事,
- 如果該 HTTP 請求被髮送到URI “/ws”,呼叫 FullHttpRequest 上的 retain(),並通過呼叫 fireChannelRead(msg) 轉發到下一個 ChannelInboundHandler。retain() 是必要的,因為 channelRead() 完成後,它會呼叫 FullHttpRequest 上的 release() 來釋放其資源。 (請參考我們先前的 SimpleChannelInboundHandler 在第6章中討論)
- 如果客戶端傳送的 HTTP 1.1 頭是“Expect: 100-continue” ,將傳送“100 Continue”的響應。
- 在 頭被設定後,寫一個 HttpResponse 返回給客戶端。注意,這是不是 FullHttpResponse,唯一的反應的第一部分。此外,我們不使用 writeAndFlush() 在這裡 - 這個是在最後完成。
- 如果沒有加密也不壓縮,要達到最大的效率可以是通過儲存 index.html 的內容在一個 DefaultFileRegion 實現。這將利用零拷貝來執行傳輸。出於這個原因,我們檢查,看看是否有一個 SslHandler 在 ChannelPipeline 中。另外,我們使用 ChunkedNioFile。
- 寫 LastHttpContent 來標記響應的結束,並終止它
- 如果不要求 keepalive ,新增 ChannelFutureListener 到 ChannelFuture 物件的最後寫入,並關閉連線。注意,這裡我們呼叫 writeAndFlush() 來重新整理所有以前寫的資訊。
處理 WebSocket frame
WebSockets 在“幀”裡面來發送資料,其中每一個都代表了一個訊息的一部分。一個完整的訊息可以利用了多個幀。
WebSocket “Request for Comments” (RFC) 定義了六中不同的 frame; Netty 給他們每個都提供了一個 POJO 實現 ,而我們的程式只需要使用下面4個幀型別:
- CloseWebSocketFrame
- PingWebSocketFrame
- PongWebSocketFrame
- TextWebSocketFrame
在這裡我們只需要顯示處理 TextWebSocketFrame,其他的會由 WebSocketServerProtocolHandler 自動處理。
下面程式碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時也將跟蹤在 ChannelGroup 中所有活動的 WebSocket 連線
TextWebSocketFrameHandler.java
public class TextWebSocketFrameHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception { // (1)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
}
channels.add(ctx.channel());
System.out.println("Client:"+incoming.remoteAddress() +"加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開"));
}
System.out.println("Client:"+incoming.remoteAddress() +"離開");
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"線上");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當出現異常就關閉連線
cause.printStackTrace();
ctx.close();
}
}
1.TextWebSocketFrameHandler 繼承自 SimpleChannelInboundHandler,這個類實現了 ChannelInboundHandler介面,ChannelInboundHandler 提供了許多事件處理的介面方法,然後你可以覆蓋這些方法。現在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現介面方法。
2.覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端連線時,客戶端的 Channel 存入 ChannelGroup列表中,並通知列表中的其他客戶端 Channel
3.覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,並通知列表中的其他客戶端 Channel
4.覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入資訊時,將資訊轉發給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重新命名為messageReceived()
5.覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端活動
6.覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動
7.exceptionCaught() 事件處理方法是當出現 Throwable 物件才會被呼叫,即當 Netty 由於 IO 錯誤或者處理器在處理事件時丟擲的異常時。在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連線之前傳送一個錯誤碼的響應訊息。
上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:
- 當WebSocket 與新客戶端已成功握手完成,通過寫入資訊到 ChannelGroup 中的 Channel 來通知所有連線的客戶端,然後新增新 Channel 到 ChannelGroup
- 如果接收到 TextWebSocketFrame,呼叫 retain() ,並將其寫、重新整理到 ChannelGroup,使所有連線的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因為當 channelRead0()返回時,TextWebSocketFrame 的引用計數將遞減。由於所有操作都是非同步的,writeAndFlush() 可能會在以後完成,我們不希望它來訪問無效的引用。
由於 Netty 處理了其餘大部分功能,唯一剩下的我們現在要做的是初始化 ChannelPipeline 給每一個建立的新的 Channel 。做到這一點,我們需要一個ChannelInitializer
WebsocketChatServerInitializer.java
public class WebsocketChatServerInitializer extends
ChannelInitializer<SocketChannel> { //1
@Override
public void initChannel(SocketChannel ch) throws Exception {//2
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
1.擴充套件 ChannelInitializer
2.新增 ChannelHandler 到 ChannelPipeline
initChannel() 方法設定 ChannelPipeline 中所有新註冊的 Channel,安裝所有需要的 ChannelHandler。
WebsocketChatServer.java
編寫一個 main() 方法來啟動服務端。
public class WebsocketChatServer {
private int port;
public WebsocketChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new WebsocketChatServerInitializer()) //(4)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println("WebsocketChatServer 啟動了");
// 繫結埠,開始接收進來的連線
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待伺服器 socket 關閉 。
// 在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("WebsocketChatServer 關閉了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new WebsocketChatServer(port).run();
}
}
1.NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器,Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。第一個經常被叫做‘boss’,用來接收進來的連線。第二個經常被叫做‘worker’,用來處理已經被接收的連線,一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。如何知道多少個執行緒已經被使用,如何對映到已經建立的Channel上都需要依賴於 EventLoopGroup 的實現,並且可以通過建構函式來配置他們的關係。
2.ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個複雜的處理過程,在很多情況下你並不需要這樣做。
3.這裡我們指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連線。
4.這裡的事件處理類經常會被用來處理一個最近的已經接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網路程式。當你的程式變的複雜時,可能你會增加更多的處理類到 pipline 上,然後提取這些匿名類到最頂層的類上。
5.你可以設定這裡指定的 Channel 實現的配置引數。我們正在寫一個TCP/IP 的服務端,因此我們被允許設定 socket 的引數選項比如tcpNoDelay 和 keepAlive。請參考 ChannelOption 和詳細的 ChannelConfig 實現的介面文件以此可以對ChannelOption 的有一個大概的認識。
6.option() 是提供給NioServerSocketChannel 用來接收進來的連線。childOption() 是提供給由父管道 ServerChannel接收到的連線,在這個例子中也是 NioServerSocketChannel。
7.我們繼續,剩下的就是繫結埠然後啟動服務。這裡我們在機器上綁定了機器所有網絡卡上的 8080 埠。當然現在你可以多次呼叫 bind() 方法(基於不同繫結地址)。
恭喜!你已經完成了基於 Netty 聊天服務端程式。
客戶端
在程式的 resources 目錄下,我們建立一個 WebsocketChatClient.html 頁面來作為客戶端
WebsocketChatClient.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8080/ws");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連線開啟!";
};
socket.onclose = <