hadoop26----netty,多個handler
阿新 • • 發佈:2018-05-13
異步 finally exception throws 被調用 決定 指定 關閉連接 order
k客戶端:
package cn.itcast_03_netty.sendorder.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; import cn.itcast_03_netty.sendobject.coder.PersonEncoder; /** * ? 連接服務器 ? 寫數據到服務器 ? 等待接受服務器返回相同的數據 ? 關閉連接 * * @author wilson * */ public class EchoClient { private final String host; private final intport; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { // 客戶端引導類 Bootstrap bootstrap = new Bootstrap();// EventLoopGroup可以理解為是一個線程池,這個線程池用來處理連接、接受數據、發送數據 nioEventLoopGroup = new NioEventLoopGroup(); bootstrap.group(nioEventLoopGroup)//多線程處理 .channel(NioSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel .remoteAddress(new InetSocketAddress(host, port))//地址 .handler(new ChannelInitializer<SocketChannel>() {//業務處理類 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//註冊handler } }); // 鏈接服務器 ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 20000).start(); } } /* 客戶端連接服務器,開始發送數據…… client 讀取server數據.. 服務端數據為 :Sun May 13 10:32:47 CST 2018 */
package cn.itcast_03_netty.sendorder.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import cn.itcast_03_netty.sendobject.bean.Person; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { // 客戶端連接服務器後被調用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端連接服務器,開始發送數據……"); byte[] req = "QUERY TIME ORDER".getBytes();//消息 ByteBuf firstMessage = Unpooled.buffer(req.length);//發送類 firstMessage.writeBytes(req);//發送 ctx.writeAndFlush(firstMessage);//flush } // ? 從服務器接收到數據後調用 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client 讀取server數據.."); // 服務端返回消息後 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服務端數據為 :" + body); } // ? 發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught.."); // 釋放資源 ctx.close(); } }
服務端:
package cn.itcast_03_netty.sendorder.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import cn.itcast_03_netty.sendobject.coder.PersonDecoder; /** * ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有一個請求連接或接收數據時該做什麽 * * @author wilson * */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //server端引導類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //連接池處理數據 eventLoopGroup = new NioEventLoopGroup(); serverBootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel .localAddress("localhost",port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。 .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求 @Override protected void initChannel(Channel ch) throws Exception { // 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2 // 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1 ch.pipeline().addLast(new EchoInHandler1()); ch.pipeline().addLast(new EchoInHandler2()); ch.pipeline().addLast(new EchoOutHandler1()); ch.pipeline().addLast(new EchoOutHandler2()); } }); // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } } /* 開始監聽,端口為:/127.0.0.1:20000 in1 in2 接收客戶端數據:QUERY TIME ORDER server向client發送數據 Complete1 */
package cn.itcast_03_netty.sendorder.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; import cn.itcast_03_netty.sendobject.bean.Person; public class EchoInHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in1"); // 通知執行下一個InboundHandler ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("Complete1"); ctx.flush();//刷新後才將數據發出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package cn.itcast_03_netty.sendorder.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; import cn.itcast_03_netty.sendobject.bean.Person; public class EchoInHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in2"); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("接收客戶端數據:" + body); //向客戶端寫數據 System.out.println("server向client發送數據"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("Complete2"); ctx.flush();//刷新後才將數據發出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package cn.itcast_03_netty.sendorder.server; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter { @Override // 向client發送消息 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out1"); /*System.out.println(msg);*/ String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); ctx.flush(); } }
package cn.itcast_03_netty.sendorder.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out2"); // 執行下一個OutboundHandler /*System.out.println("at first..msg = "+msg); msg = "hi newed in out2";*/ super.write(ctx, msg, promise); } }
hadoop26----netty,多個handler