1. 程式人生 > >Day14.高效能RPC設計 學習筆記2

Day14.高效能RPC設計 學習筆記2

一、通道選擇器

nio

通道註冊:需要使用Selector管理通道,然後將就緒的通道封裝成SelectionKey物件。

  • 設定通道為非阻塞 ServerSocketChannel/SocketChannel#configureBlocking(false)
  • 註冊通道ServerSocketChannel/SocketChannel#register(selector,事件型別[,附件資訊])

NIO的網路程式設計的思想是基於非同步事件處理,底層通過Selector去管理註冊列表,一旦註冊列表的相關通道就緒,selector就會將就緒的通道放置在事件處理佇列中,使用者可以通過Selector#selectedKeys()

獲取就緒的keys

所有就緒的key只能被處理一次,因此使用者必須在處理完事件key後,將該事件在事件處理列表中移除。

事件取消註冊通道關閉,key#cancel()

ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9999));
//設定通道非阻塞
ssc.configureBlocking(false);

//建立通道選擇器
Selector selector=Selector.open();
//註冊ACCEPT事件型別 轉發
ssc.register(
selector,SelectionKey.OP_ACCEPT); //迭代遍歷事件key while(true){ //返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup) int num = selector.select(); if(num >0){ //事件處理 Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key =
keys.next(); //處理對應的事件key if(key.isAcceptable()){ //處理轉發事件 ServerSocketChannel channel= (ServerSocketChannel) key.channel(); SocketChannel s=channel.accept();//立即返回一個不為null的SocketChannel s.configureBlocking(false);//註冊讀 s.register(selector,SelectionKey.OP_READ); }else if(key.isReadable()){ //處理讀事件 SocketChannel s= (SocketChannel) key.channel(); //處理讀 ... //註冊寫 s.register(selector,SelectionKey.OP_WRITE[,請求引數]); }else if(key.isWritable()){ //處理寫事件 SocketChannel s= (SocketChannel) key.channel(); //根據請求引數給出響應 ... s.shutdownOutput();//告知寫結束 s.close()} //移除key keys.remove(); } } }

二、NIO單執行緒版本

public class NIOBootstrapServer {
    public static void main(String[] args) throws IOException {
        //1、建立ServerSocket
        ServerSocketChannel ssc=ServerSocketChannel.open();
        //2、繫結監聽埠
        ssc.bind(new InetSocketAddress(9999));
        //3、設定通道非阻塞
        ssc.configureBlocking(false);

        //4、建立通道選擇器
        Selector selector= Selector.open(); //nio多是open來註冊東西
        //5、註冊ACCEPT事件型別 轉發
        ssc.register(selector,SelectionKey.OP_ACCEPT);

        //迭代遍歷事件key
        while(true){
            //返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup)
            System.out.println("嘗試選擇待處理的keys...");
            //可以出來keys的數目,如果沒有該方法block
            int num = selector.select();

            if(num >0){
                //事件處理
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    SelectionKey key = keys.next();
                    //處理對應的事件key
                    if(key.isAcceptable()){
                        System.out.println("處理轉發同時註冊讀...");
                        //處理轉發事件
                        ServerSocketChannel channel= (ServerSocketChannel) key.channel();

                        SocketChannel s=channel.accept();//立即返回一個不為null的SocketChannel
                        s.configureBlocking(false);//設定非阻塞
                        //註冊讀
                        s.register(selector,SelectionKey.OP_READ,new ByteArrayOutputStream());
                    }else if(key.isReadable()){
                        System.out.println("處理讀...");
                        //處理讀事件
                        SocketChannel s= (SocketChannel) key.channel(); //拿到註冊過的SocketChannel
                        //處理讀
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        ByteArrayOutputStream baos= (ByteArrayOutputStream) key.attachment();

                        //一次嘗試讀取一個緩衝區
                        int n=s.read(buffer);
                        if(n==-1){
                            System.out.println("伺服器收到:"+new String(baos.toByteArray()));
                            //根據請求引數給出響應
                            ByteArrayInputStream bais=new ByteArrayInputStream((new Date().toLocaleString()).getBytes());
                            //註冊寫
                            s.register(selector, SelectionKey.OP_WRITE,bais);
                        }else{
                            buffer.flip();
                            baos.write(buffer.array(),0,n);
                        }

                    }else if(key.isWritable()){
                        System.out.println("處理寫...");
                        //處理寫事件
                        SocketChannel s= (SocketChannel) key.channel();
                        ByteArrayInputStream bais = (ByteArrayInputStream)key.attachment();

                        byte[] bytes=new byte[1024];
                        int n = bais.read(bytes);
                        if(n==-1){
                            s.shutdownOutput();//告知寫結束
                            s.close(); //關閉通道
                        }else{
                            s.write(ByteBuffer.wrap(bytes,0,n));
                        }
                    }
                    //移除key
                    keys.remove();
                }
            }
        }
    }
}

三、NIO多執行緒版【瞭解】

圖02

依賴

<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.6</version>
</dependency>

完整實現如下

public class NIOBootstrapServerPool {
    //該執行緒池主要負責請求的轉發
    private static ExecutorService master= Executors.newFixedThreadPool(66);
    //該執行緒池主要負責請求的響應
    private static ExecutorService worker= Executors.newFixedThreadPool(66);

    //註冊轉發佇列
    private static final AtomicBoolean NEED_REG_DISPATH= new AtomicBoolean(false);
    //註冊讀佇列
    private static final List<ChannelAndAtt> READ_QUEUE= new Vector<ChannelAndAtt>();
    //註冊寫佇列
    private static final CopyOnWriteArrayList<ChannelAndAtt> WRITE_QUEUE= new CopyOnWriteArrayList<ChannelAndAtt>();

    public static void main(String[] args) throws IOException {
        //建立serverSocket
        ServerSocketChannel ssc=ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(9999));
        //設定通道非阻塞
        ssc.configureBlocking(false);
        //建立通道選擇器selector
        Selector selector= Selector.open();
        //註冊ACCEPT事件型別 轉發
        ssc.register(selector,SelectionKey.OP_ACCEPT);

        //迭代遍歷事件key
        while(true){
            //返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup)
            // System.out.println("嘗試選擇待處理的keys...");
            int num = selector.select(1);
            if(num >0){
                //事件處理
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while(keys.hasNext()){
                    SelectionKey key = keys.next();
                    //處理對應的事件key
                    if(key.isAcceptable()){ //請求轉發
                        key.cancel();//取消轉發註冊
                        master.submit(new ProcessDispatcher(key,selector));
                    }else if(key.isReadable()){ //讀取IO處理
                        key.cancel();//取消讀註冊
                        worker.submit(new ProcessRead(key,selector));
                    }else if(key.isWritable()){ //響應IO處理
                        key.cancel();//取消寫註冊
                        worker.submit(new ProcessWrite(key,selector));
                    }
                    //刪除當前事件key,刪除並不意味著取消註冊
                    keys.remove();
                }
            }else {
                if(NEED_REG_DISPATH.get()){//需要重新註冊ACCEPT
                    System.out.println("重新註冊ACCEPT");
                    ssc.register(selector,SelectionKey.OP_ACCEPT);
                    NEED_REG_DISPATH.set(false);
                }
                while(READ_QUEUE.size()>0){
                    ChannelAndAtt channelAndAtt = READ_QUEUE.remove(0);
                    //註冊讀
                    System.out.println("註冊READ");
                    channelAndAtt.getChannel().register(selector,SelectionKey.OP_READ,channelAndAtt.att);
                }
                while(WRITE_QUEUE.size()>0){
                    ChannelAndAtt channelAndAtt = WRITE_QUEUE.remove(0);
                    //註冊寫
                    System.out.println("註冊寫");
                    channelAndAtt.getChannel().register(selector,SelectionKey.OP_WRITE,channelAndAtt.att);
                }
            }
        }
    }
    /**
     * 處理請求寫
     */
    public static class ProcessWrite implements Runnable{
        private SelectionKey key;
        private Selector selector;

        public ProcessWrite(SelectionKey key, Selector selector) {
            this.key = key;
            this.selector = selector;
        }
        @Override
        public void run() {
            try {
                SocketChannel s= (SocketChannel) key.channel();
                ByteArrayInputStream bais = (ByteArrayInputStream)key.attachment();

                byte[] bytes=new byte[1024];
                int n = bais.read(bytes);//最多從bais獲取一個緩衝區的資料

                if(n==-1){
                    s.shutdownOutput();//告知寫結束
                    s.close(); //關閉通道
                }else{
                    //最多寫一個緩衝區的資料
                    s.write(ByteBuffer.wrap(bytes,0,n));
                    //恢復寫註冊
                    WRITE_QUEUE.add(new ChannelAndAtt(s,bais));
                }
                //打破main執行緒
                selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 處理請求讀
     */
    public static class ProcessRead implements Runnable{
        private SelectionKey key;
        private Selector selector;

        public ProcessRead(SelectionKey key, Selector selector) {
            this.key = key;
            this.selector = selector;
        }
        @Override
        public void run() {
            try {
                //處理讀事件
                SocketChannel s= (SocketChannel) key.channel();
                //處理讀
                ByteBuffer buffer=ByteBuffer.allocate(1024);
                ByteArrayOutputStream baos= (ByteArrayOutputStream) key.attachment();

                int n=s.read(buffer);
                if(n==-1){
                    //根據請求引數給出響應
                    Object req= SerializationUtils.deserialize(baos.toByteArray());
                    System.out.println("伺服器收到:"+req);
                    ByteArrayInputStream bais=new ByteArrayInputStream(SerializationUtils.serialize(new Date()));
                    //註冊寫
                    WRITE_QUEUE.add(new ChannelAndAtt(s,bais));
                }else{
                    buffer.flip();
                    baos.write(buffer.array(),0,n);
                    //恢復讀註冊
                    READ_QUEUE.add(new ChannelAndAtt(s,baos));
                }
                //打斷mian執行緒阻塞
                selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 處理請求轉發
     */
    public static class ProcessDispatcher implements Runnable{
        private SelectionKey key;
        private Selector selector;

        public ProcessDispatcher(SelectionKey key, Selector selector) {
            this.key = key;
            this.selector = selector;
        }
        @Override
        public void run() {
            try {
                //獲取通道
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel s = ssc.accept();
                s.configureBlocking(false);
                /