1. 程式人生 > >hadoop26----netty,多個handler

hadoop26----netty,多個handler

異步 finally exception throws 被調用 決定 指定 關閉連接 order

k客戶端:

package cn.itcast_03_netty.sendorder.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; import cn.itcast_03_netty.sendobject.coder.PersonEncoder; /** * ? 連接服務器 ? 寫數據到服務器 ? 等待接受服務器返回相同的數據 ? 關閉連接 * * @author wilson * */ public class EchoClient { private final String host; private final int
port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { // 客戶端引導類 Bootstrap bootstrap = new Bootstrap();
// EventLoopGroup可以理解為是一個線程池,這個線程池用來處理連接、接受數據、發送數據 nioEventLoopGroup = new NioEventLoopGroup(); bootstrap.group(nioEventLoopGroup)//多線程處理 .channel(NioSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel .remoteAddress(new InetSocketAddress(host, port))//地址 .handler(new ChannelInitializer<SocketChannel>() {//業務處理類 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//註冊handler } }); // 鏈接服務器 ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 20000).start(); } } /* 客戶端連接服務器,開始發送數據…… client 讀取server數據.. 服務端數據為 :Sun May 13 10:32:47 CST 2018 */
package cn.itcast_03_netty.sendorder.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import cn.itcast_03_netty.sendobject.bean.Person;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    // 客戶端連接服務器後被調用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端連接服務器,開始發送數據……");
        byte[] req = "QUERY TIME ORDER".getBytes();//消息
        ByteBuf firstMessage = Unpooled.buffer(req.length);//發送類
        firstMessage.writeBytes(req);//發送
        ctx.writeAndFlush(firstMessage);//flush
    }

    // ? 從服務器接收到數據後調用
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {
        System.out.println("client 讀取server數據..");
        // 服務端返回消息後
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("服務端數據為 :" + body);
    }

    // ? 發生異常時被調用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.out.println("client exceptionCaught..");
        // 釋放資源
        ctx.close();
    }
}

服務端:

package cn.itcast_03_netty.sendorder.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import cn.itcast_03_netty.sendobject.coder.PersonDecoder;

/**
 * ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有一個請求連接或接收數據時該做什麽
 * 
 * @author wilson
 *
 */
public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup eventLoopGroup = null;
        try {
            //server端引導類
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //連接池處理數據
            eventLoopGroup = new NioEventLoopGroup();
            serverBootstrap.group(eventLoopGroup)
            .channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
            .localAddress("localhost",port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。
            .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    // 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2
                    // 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1
                    ch.pipeline().addLast(new EchoInHandler1());
                    ch.pipeline().addLast(new EchoInHandler2());
                    ch.pipeline().addLast(new EchoOutHandler1());
                    ch.pipeline().addLast(new EchoOutHandler2()); 
                    
                }
                    });
            // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(20000).start();
    }
}

/*
開始監聽,端口為:/127.0.0.1:20000
in1
in2
接收客戶端數據:QUERY TIME ORDER
server向client發送數據
Complete1
*/
package cn.itcast_03_netty.sendorder.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

import cn.itcast_03_netty.sendobject.bean.Person;

public class EchoInHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in1");
         // 通知執行下一個InboundHandler
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Complete1");
        ctx.flush();//刷新後才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
package cn.itcast_03_netty.sendorder.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

import cn.itcast_03_netty.sendobject.bean.Person;

public class EchoInHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in2");
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("接收客戶端數據:" + body);
        //向客戶端寫數據
        System.out.println("server向client發送數據");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
        
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Complete2");
        ctx.flush();//刷新後才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
package cn.itcast_03_netty.sendorder.server;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    // 向client發送消息
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("out1");
        /*System.out.println(msg);*/
        
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
        ctx.flush();
       }
}
package cn.itcast_03_netty.sendorder.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {

     @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("out2");
            // 執行下一個OutboundHandler
            /*System.out.println("at first..msg = "+msg);
            msg = "hi newed in out2";*/
            super.write(ctx, msg, promise);
        }

}

hadoop26----netty,多個handler