1. 程式人生 > >Java NIO——selector

Java NIO——selector

文章目錄


Java NIO程式設計例項之三Selector

1.概念

選擇器(Selector) 是 SelectableChannle 物件的多路複用器, Selector 可以同時監控多個 SelectableChannel 的 IO 狀況,也就是說,利用 Selector可使一個單獨的執行緒管理多個 Channel。 Selector 是非阻塞 IO 的核心。
Selector、SelectionKey和SelectableChannel,它們之間的關係如下圖所示:
在這裡插入圖片描述


SelectableChannel是一類可以與Selector進行配合的通道,例如Socket相關通道以及Pipe產生的通道都屬於SelectableChannel。這類通道可以將自己感興趣的操作(例如read、write、accept和connect)註冊到一個
Selector是一個控制器,它負責管理已註冊的多個SelectableChannel,當這些通道的某些狀態改變時,Selector會被喚醒(從select()方法的阻塞中),並對所有就緒的通道進行輪詢操作。

2 selectionKey

當呼叫 register(Selector sel, int ops) 將通道註冊選擇器時,選擇器對通道的監聽事件,需要通過第二個引數 ops 指定可以監聽的事件型別(可使用 SelectionKey 的四個常量表示):

  • 讀 : SelectionKey.OP_READ (1)
  • 寫 : SelectionKey.OP_WRITE (4)
  • 連線 : SelectionKey.OP_CONNECT (8)
  • 接收 : SelectionKey.OP_ACCEPT (16)
    若註冊時不止監聽一個事件,則可以使用“位或”操作符連線。
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE;

selectionKey常用方法

方法 功能
int interestOps() 返回所感興趣的事件集合
int readyOps() 返回已經準備就緒的集合
boolean isAcceptable() 接收是否就緒
boolean isConnectable() 連線是否就緒
boolean isReadable() 讀操作是否就緒
boolean isWritable 寫操作是否就緒
Channel channel 獲取通道
Selector selector 獲取選擇器
attach(Object o) 將物件或資料附著到SelectionKey上
Object attachment() 獲取附著物件

3 常用方法

3.1 register(Selector selector,int ops)

SelectionKey register(Selector sel, int ops)

方法的返回值是一個SelectionKey,這個物件會被自動加入Selector的keys集合,因此不必特意保留這個SelectionKey的物件引用,需要時可以使用Selector的keys()方法得到所有的SelectionKey物件引用。
註冊完成後,該通道就與Selector保持關聯了。當通道的狀態改變時,其改變會自動被Selector感知,並在Selector的三個集合中反應出來。

3.2 三個集合

  • Set keys(),獲取keys集合,儲存了所有與selector關聯的Selectionkey物件
  • Set selectedKeys(),獲取selectedKeys集合,儲存了在一次select()方法呼叫後,所有狀態改變的通道關聯的SelectionKey物件
  • cancelledKeys集合,儲存了一輪select()方法呼叫過程中,所有被取消但還未從keys中刪除的SelectionKey物件

3.3 select方法

  • int select(),會一直阻塞,直到至少有一個註冊的通道狀態改變,才會被喚醒
  • int select(long timeout),一直阻塞,直到時間耗盡,或者有通道的狀態改變。

3.4 wakeUp

3.5 close

4 demo

public class SelectorServer {
    private static final int PORT = 1234;
    private static ByteBuffer buffer = ByteBuffer.allocate(1024);

    public static void main(String[] args) {
        try {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(PORT));
            ssc.configureBlocking(false);
            //1.register()
            Selector selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());

            while (true) {
                //2.select()
                int n = selector.select();
                if (n == 0) {
                    continue;
                }
                //3.輪詢SelectionKey
                Iterator<SelectionKey> iterator = (Iterator) selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //如果滿足Acceptable條件,則必定是一個ServerSocketChannel
                    if (key.isAcceptable()) {
                        ServerSocketChannel sscTemp = (ServerSocketChannel) key.channel();
                        //得到一個連線好的SocketChannel,並把它註冊到Selector上,興趣操作為READ
                        SocketChannel socketChannel = sscTemp.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());
                    }
                    //如果滿足Readable條件,則必定是一個SocketChannel
                    if (key.isReadable()) {
                        //讀取通道中的資料
                        SocketChannel channel = (SocketChannel) key.channel();
                        readFromChannel(channel);
                    }
                    //4.remove SelectionKey
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void readFromChannel(SocketChannel channel) {
        buffer.clear();
        try {
            while (channel.read(buffer) > 0) {
                buffer.flip();
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                System.out.println("READ FROM CLIENT:" + new String(bytes));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

首先註冊了一個ServerSocketChannel,它用來監聽1234埠上的連線;當監聽到連線時,把連線上的SocketChannel再註冊到Selector上,這些SocketChannel註冊的是SelectionKey.OP_READ事件;當這些SocketChannel狀態變為可讀時,讀取資料並顯示。

public class SelectorClient {
    static class Client extends Thread {
        private String name;
        private Random random = new Random(47);

        Client(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                SocketChannel channel = SocketChannel.open();
                channel.configureBlocking(false);
                channel.connect(new InetSocketAddress(1234));
                while (!channel.finishConnect()) {
                    TimeUnit.MILLISECONDS.sleep(100);
                }
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                for (int i = 0; i < 5; i++) {
                    TimeUnit.MILLISECONDS.sleep(100 * random.nextInt(10));
                    String str = "Message from " + name + ", number:" + i;
                    buffer.put(str.getBytes());
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        channel.write(buffer);
                    }
                    buffer.clear();
                }
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Client("Client-1"));
        executorService.submit(new Client("Client-2"));
        executorService.submit(new Client("Client-3"));
        executorService.shutdown();
    }
}