netty spring (整合)
Netty Spring (整合)
瘋狂創客圈 死磕Netty 系列之 11 【 部落格園 總入口 】
主要介紹的是SpringBoot整合Netty。在使用Netty之前,建議先了解Netty的基本原理,請參閱瘋狂創客圈。
這裡僅僅是使用Netty的第一步,這裡介紹一個最簡單的Demo——EchoServer,也就是回寫伺服器。就是無論客戶端發啥字串到伺服器端,伺服器端接收字串後直接回寫到客戶端。
本篇內容綱要
- 環境要求
- Spring +netty 伺服器端
- Spring +netty 客戶端
- Spring讀取配置檔案中的屬性值
原始碼 下載連結:
環境要求
- JDK::1.8
- Netty::4.0或以上(不包括5)
<java.version>1.8</java.version>
<springboot>1.5.9.RELEASE</springboot>
<netty.version>4.0.33.Final</netty.version>
##Spring +netty 伺服器端
回寫伺服器 Echo Server 程式主要由兩部分組成:
-
ServerBootstrap: 伺服器啟動引導器。負責配置伺服器端基本資訊,並且完成伺服器的啟動
-
EchoServerHandler: 回寫的業務邏輯處理器
ServerBootstrap
首先是編寫服務端的啟動類,程式碼中相應的註釋在寫得很詳細。 主要的步驟如下:
- 建立一個ServerBootstrap例項
- 建立一個EventLoopGroup來處理各種事件,如處理連結請求,傳送接收資料等。
- 設定本地監聽埠 InetSocketAddress( port)
- 設定 childHandler 來設定通道初始化類。並且在通道初始化時,加入回寫的業務邏輯處理器EchoServerHandler到伺服器通道的pipeline中 。childHandler 在通道初始化時,會被執行一次。
- 所有準備好之後呼叫ServerBootstrap.bind() 方法繫結 Server
不過需要注意的是,在不使用Spring的環境中,是通過main方法直接啟動服務端,因此是直接new一個處理器echoServerHandler 物件。而在和Spring 整合之後,我們需要將 echoServerHandler 處理器交給springBoot去管理。
ServerBootstrap 程式碼如下:
@Service("EchoServer")
public class EchoServer
{
// 伺服器埠
@Value("${server.port}")
private int port;
// 通過nio方式來接收連線和處理連線
private static EventLoopGroup boss = new NioEventLoopGroup();
private static EventLoopGroup work = new NioEventLoopGroup();
// 啟動引導器
private static ServerBootstrap b = new ServerBootstrap();
@Autowired
private EchoServerHandler echoServerHandler;
public void run()
{
try
{
b.group(boss, work);
// 設定nio型別的channel
b.channel(NioServerSocketChannel.class);
// 設定監聽埠
b.localAddress(new InetSocketAddress(port));
// 設定通道初始化
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有連線到達時會建立一個channel
protected void initChannel(SocketChannel ch) throws Exception
{
// pipeline管理channel中的Handler
// 在channel佇列中新增一個handler來處理業務
ch.pipeline().addLast("echoServerHandler",echoServerHandler);
}
});
// 配置完成,開始繫結server
// 通過呼叫sync同步方法阻塞直到繫結成功
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() +
" started and listen on " + f.channel().localAddress());
// 監聽伺服器關閉事件
// 應用程式會一直等待,直到channel關閉
f.channel().closeFuture().sync();
} catch (Exception e)
{
e.printStackTrace();
} finally
{
// 關閉EventLoopGroup,釋放掉所有資源包括建立的執行緒
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
業務邏輯ServerHandler:
要想處理接收到的資料,我們必須繼承ChannelInboundHandlerAdapter介面,重寫裡面的channelRead方法,每當有資料到達,此方法就會被呼叫(一般是Byte型別陣列),我們就在這裡寫我們的業務邏輯:
@Service("echoServerHandler")
public class EchoServerHandler extends ChannelInboundHandlerAdapter
{
/**
* 建立連線時,傳送一條訊息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
System.out.println("連線的客戶端地址:" + ctx.channel().remoteAddress());
super.channelActive(ctx);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
try
{
System.out.println("server received data :" + msg);
ctx.write(msg);//寫回資料,
} finally
{
ReferenceCountUtil.release(msg);
}
}
public void channelReadComplete(ChannelHandlerContext ctx)
{
//flush掉所有寫回的資料
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE); //當flush完成後關閉channel
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
//捕捉異常資訊
cause.printStackTrace();
//出現異常時關閉channel
ctx.close();
}
}
關於異常處理:
我們在上面程式中也重寫了exceptionCaught方法,這裡就是對當異常出現時的處理。
Spring +netty 客戶端
EchoClient 扮演如下角色:
- 連線到Server
- 向Server寫資料,
- 等待Server返回資料
回寫客戶端程式EchoClient 主要由兩部分組成:
- Bootstrap: 客戶端啟動引導器。負責配置客戶端基本資訊,並且完成客戶端的啟動
- EchoClientHandler : 客戶端業務邏輯處理器
EchoClient Bootstrap的過程:
和Server端類似,只不過Client端要同時指定連線主機的IP和Port。
- 建立一個Bootstrap例項
- 建立一個EventLoopGroup 來處理各種事件,如處理連結請求,傳送接收資料等。
- 定義需要連線到的遠端伺服器的InetSocketAddress,包含了IP+埠
- 設定 childHandler 來設定通道初始化類。並且在通道初始化時,加入客戶端的業務邏輯處理器echoClientHandler 到伺服器通道的pipeline中 。當連線完成之後,childHandler 會被執行一次 。
- 所有準備好之後呼叫 ServerBootstrap.connect() 方法連線Server
EchoClient Bootstrap的程式碼:
@Service("EchoClient")
public class EchoClient
{
// 伺服器ip地址
@Value("${server.ip}")
private String host;
// 伺服器埠
@Value("${server.port}")
private int port;
// 通過nio方式來接收連線和處理連線
private EventLoopGroup group = new NioEventLoopGroup();
@Autowired
private EchoClientHandler echoClientHandler;
/**
* 唯一標記
*/
private boolean initFalg = true;
/**
* 客戶端的是Bootstrap,服務端的則是 ServerBootstrap。
* 都是AbstractBootstrap的子類。
**/
public void run()
{
doConnect(new Bootstrap(), group);
}
/**
* 重連
*/
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup)
{
ChannelFuture f = null;
try
{
if (bootstrap != null)
{
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.remoteAddress(host, port);
// 設定通道初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>()
{
public void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(echoClientHandler);
}
}
);
f = bootstrap.connect().addListener((ChannelFuture futureListener) ->
{
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess())
{
System.out.println("與服務端斷開連線!在10s之後準備嘗試重連!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}
});
if (initFalg)
{
System.out.println("EchoClient客戶端連線成功!");
initFalg = false;
}
// 阻塞
f.channel().closeFuture().sync();
}
} catch (Exception e)
{
System.out.println("客戶端連線失敗!" + e.getMessage());
}
}
}
EchoClientHandler 客戶端業務邏輯處理器
要想處理接收到的資料,我們必須繼承ChannelInboundHandlerAdapter基類,重寫裡面的channelRead方法,每當有資料到達,此方法就會被呼叫(一般是Byte型別陣列),我們就在這裡寫我們的業務邏輯:
@Service("echoClientHandler")
public class EchoClientHandler extends ChannelInboundHandlerAdapter
{
/**
* 此方法會在連線到伺服器後被呼叫
*/
public void channelActive(ChannelHandlerContext ctx)
{
ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
/**
* 業務邏輯處理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
// 如果不是protobuf型別的資料
if (!(msg instanceof ByteBuf))
{
System.out.println("未知資料!" + msg);
return;
}
try
{
ByteBuf in = (ByteBuf) msg;
System.out.println("Client received: " +
ByteBufUtil.hexDump(in.readBytes(in.readableBytes())));
} catch (Exception e)
{
e.printStackTrace();
} finally
{
ReferenceCountUtil.release(msg);
}
}
/**
* 捕捉到異常
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
cause.printStackTrace();
ctx.close();
}
}
除了繼承ChannelInboundHandlerAdapter基類,我們的業務Handler還可以繼承 SimpleChannelInboundHandler 基類。
那麼這兩個有什麼區別呢?
-
SimpleChannelInboundHandler在接收到資料後會自動release掉資料佔用的Bytebuffer資源(自動呼叫Bytebuffer.release())。如果在channelRead方法返回前還沒有寫完資料,也就是當不能讓它自動release時,就不能繼承 SimpleChannelInboundHandler 基類。而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動呼叫ReferenceCountUtil.release()等方法進行釋放。
-
SimpleChannelInboundHandler還有一個好處,可以在泛型引數中,可以直接指定好傳輸的資料格式。所以繼承該類,在處理資料時,不需要判斷資料格式。而繼承ChannelInboundHandlerAdapter則需要進行資料格式的判斷和轉換。
-
推薦在服務端去繼承ChannelInboundHandlerAdapter,建議手動進行釋放,防止資料未處理完就自動釋放了。
Spring 讀取配置檔案中的屬性值
在Netty 的程式中,一般需要用到伺服器ip和埠,最好的方式是放在配置檔案中,方便修改。
Spring Boot 預設的配置檔名稱為 application.properties,SpringApplication將從以下位置載入此檔案:
- 當前目錄下的/config子目錄,
- 當前目錄
- 一個classpath下的/config包
- classpath 根路徑(root)
一般情況下,工程在編譯之後,application.properties 放在classpath 根路徑下。
配置檔案 application.properties
#埠號 server.port=8081 #伺服器IP server.ip=127.0.0.1
注意:檔名字不能錯哦,是application.properties
關聯配置項到類屬性
在類域屬性上通過@Value("${配置項}")指定關聯屬性,Spring Application會自動載入。
public class EchoServer
{
// 伺服器埠
@Value("${server.port}")
private int port;
//...
}
啟動配置項自動掃描
使用 @Configuration、@EnableAutoConfiguration 啟動配置項的自動掃描。
//自動載入配置資訊
@Configuration
@EnableAutoConfiguration
//使包路徑下帶有@Value的註解自動注入
//使包路徑下帶有@Autowired的類可以自動注入
@ComponentScan("com.crazymakercircle.nettydemo.server")
@SpringBootApplication
public class ServerApp {
// ........
}
瘋狂創客圈 實戰計劃
- Netty 億級流量 高併發 IM後臺 開源專案實戰
- Netty 原始碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡