1. 程式人生 > >Java NIO系列4:通道和選擇器

Java NIO系列4:通道和選擇器

前言

今天加班回來,終於有時間繼續更新NIO的文章了。在前一篇文章我們講解了緩衝區的知識,並通過程式碼演示瞭如何使用緩衝區的API完成一些操作。這裡要講的通道於緩衝區關係密切,簡單來說,緩衝區是填充資料的載體,而通道則可以理解為傳輸資料的載體。回憶在TCP/IP中建立握手的過程,傳送端有一個傳送緩衝區而接受端有一個接收緩衝區,程序從緩衝區中取資料,之後緩衝區又可以被填滿,而傳輸資料的網路則可以理解為通道。

通道基礎

相比JDK1.4之前的BIO,NIO一個重大的改變就是由原來的每次的連線都會建立一個新的執行緒區執行,變為只為每個請求建立執行緒,因為如果一個請求需要建立多次連線的話,BIO的效率就非常低下了。NIO處理事件的模型被稱為反應器模型。就是在每個請求到達的時候不會立即進行處理,而是會有一個分發執行緒將處理請求分發給具體的處理執行緒進行處理。這樣設計的好處在於能夠提高吞吐量,提高效能。

那麼,這裡要提到的通道有什麼關係呢?在服務端和客戶端有一個專門管理通道的物件,這個物件能夠監控每個通道的事件(後臺的實現邏輯就是不斷輪詢每個通道所註冊的事件,並判斷是否滿足要求),如果這個物件發現某個通道所註冊的事件發生了,那麼註冊該事件的通道就可以執行一些自己的處理。

在通道API中,頂層介面是Channel,程式碼如下:

package java.nio.channels;
public interface Channel
{
public boolean isOpen( );
public void close( ) throws IOException;
}

在該介面中知有開啟和關閉通道的方法,那麼這兩個方法夠用嗎?當然不夠,實際上更具體的方法都在不同的實現類中。而在通道的實現類中又可以分為兩類:FileChannel和SocketChannel。這兩種通道的開啟方式如下:

// 開啟Socket通道
SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));
ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));
// 開啟UDP通道
DatagramChannel dc = DatagramChannel.open( );
// 開啟檔案通道
RandomAccessFile raf = new
RandomAccessFile ("somefile", "r"); FileChannel fc = raf.getChannel( );

通道實戰

通過以上的介紹,我們對通道有了一個基本的認識,下面主要演示如何通過程式碼的方式使用NIO中通道(由於在NIO應用最廣的是Socket通道,所以下面的例子都是基於Socket通道)。

通道的簡單使用

下面通過從通道拷貝資料帶緩衝區為例,對通道的基本使用做一個簡單演示:

package com.rhwayfun.patchwork.nio.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class ChannelCopy {
    public static void main(String[] args) throws IOException {
        // 建立一個源通道
        ReadableByteChannel source = Channels.newChannel(System.in);
        // 建立一個目標通道
        WritableByteChannel dest = Channels.newChannel(System.out);
        channelCopy(source,dest);
        // 關閉通道
        source.close();
        dest.close();
    }

    /**
     * 通道拷貝
     * @param source
     * @param dest
     * @throws IOException
     */
    private static void channelCopy(ReadableByteChannel source, WritableByteChannel dest) throws IOException {
        // 申請16 * 1024位元組大小的通道
        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
        // 呼叫read方法讀取緩衝區的資料
        while (source.read(buffer) != -1){
            // 翻轉緩衝區,執行的操作:
            // 1、將limit的位置設為position之後的一個位置
            // 2、將position的位置重置未0
            buffer.flip();
            // 當緩衝區還有資料的話就寫到目標通道中
            while (buffer.hasRemaining()){
                dest.write(buffer);
            }
            // 清空緩衝區
            buffer.clear();
        }
    }

    /**
     * 通道拷貝的另一種方式
     * @param source
     * @param dest
     * @throws IOException
     */
    private static void channelCopy2(ReadableByteChannel source, WritableByteChannel dest) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
        while (source.read(buffer) != -1){
            // 翻轉緩衝區
            buffer.flip();
            // 將緩衝區的資料寫道目標通道
            dest.write(buffer);
            // 如果只寫了一部分資料,將空間進行壓縮,可以重複利用空間
            buffer.compact();
        }
        // 翻轉緩衝區
        buffer.flip();
        // 將剩餘的資料寫入目標緩衝區
        while (buffer.hasRemaining()){
            dest.write(buffer);
        }
    }
}

TCP伺服器

在前面提到有一個物件對服務端和客戶端的通道進行管理,這個物件就是Selector,可以理解為選擇器,這個選擇器就是為通道服務的。伺服器和客戶端可以註冊自己感興趣的事件,這樣Selector就可以不同的多個通道伺服器的狀態。通常Selector上可以註冊的事件型別如下:

事件描述 事件定義
服務端接收客戶端連線事件 SelectionKey.OP_ACCEPT(16)
客戶端連線服務端事件 SelectionKey.OP_CONNECT(8)
讀事件 SelectionKey.OP_READ(1)
寫事件 SelectionKey.OP_WRITE(4)

比如伺服器在Selector物件上註冊了OP_ACCEPT事件,那麼當有客戶端連線上的時候,該事件就可以被響應。

下面實現了一個簡單的TCP伺服器和客戶端:

客戶端

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorClient {

    // 連線的主機
    private String host;
    // 主機的埠
    private int port;
    // 選擇器
    private Selector selector;
    // 通道
    private SocketChannel socketChannel;

    public SelectorClient(String host,int port){
        this.host = host;
        this.port = port;
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void init() throws IOException {
        // 開啟一個選擇器
        selector = Selector.open();
        // 開啟一個通道
        socketChannel = SocketChannel.open(new InetSocketAddress(host,port));
        // 要繫結的地址
        //SocketAddress remoteAddress = new InetSocketAddress(host,port);
        // 繫結到指定的地址
        //socketChannel.bind(remoteAddress);
        // 配置為非阻塞模式
        socketChannel.configureBlocking(false);
        // 註冊到選擇器上
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 監聽來自服務端的響應
        new SelectorThread(selector).start();
    }

    public void writeDataToServer(String message) throws IOException {
        ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes("UTF-8"));
        socketChannel.write(writeBuffer);
    }

    public static void main(String[] args) throws IOException {
        SelectorClient client = new SelectorClient("localhost",6666);
        client.writeDataToServer("我是一個客戶端");
    }
}

伺服器

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorServer {

    // 伺服器監聽的埠
    private static final int PORT = 6666;
    // 處理資料的緩衝區
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 歡迎訊息
    private static final String GREETING = "Welcome to here.";

    public static void main(String[] args) {
        new SelectorServer().start(args);
    }

    private void start(String[] args) {
        int port = PORT;
        if (args.length == 1){
            port = Integer.valueOf(args[0]);
        }
        System.out.println("listening on port " + port);
        Iterator<SelectionKey> iterator = null;
        try {
            //建立一個ServerChannel
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            //獲取通道關聯的Socket物件
            ServerSocket serverSocket = serverChannel.socket();
            //要繫結的地址
            SocketAddress address = new InetSocketAddress(port);
            //建立需要註冊的選擇器
            Selector selector = Selector.open();

            //把socket物件繫結到指定的地址
            serverSocket.bind(address);

            //配置為非阻塞模式
            serverChannel.configureBlocking(false);

            //註冊通道到選擇器
            //第二個引數表名serverChannel感興趣的事件是OP_ACCEPT型別的事件
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            // 選擇器不斷迴圈從選擇器中選取已經準備好的通道進行操作
            // 選取之後,會對其感興趣的事件進行處理。將感興趣的事件
            // 處理完畢後將key從集合中刪除,表示該通道的事件已經處
            // 理完畢

            while (true){
                // 這個操作可能會被阻塞,因為不知道註冊在這個選擇器上的通道是否準備好了
                int n = selector.select();
                if (n == 0){
                    continue;
                }

                // 獲取SelectionKey的迭代器物件
                iterator  = selector.selectedKeys().iterator();

                while (iterator.hasNext()){
                    // 獲取這個key關聯的通道
                    SelectionKey key = iterator.next();
                    // 判斷感興趣的事件型別
                    if (key.isAcceptable()){
                        // 這裡可以強制轉換為ServerSocketChannel
                        // 因為在這個選擇器上目前只註冊了一個該型別的通道
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        // 呼叫accept方法可以得到連線到此地址的客戶端連線
                        SocketChannel channel = server.accept();

                        // 註冊客戶端連線到選擇器上,並把感興趣的事件型別設為可讀型別
                        registerChannel(selector,channel,SelectionKey.OP_READ);

                        // 給客戶端傳送響應訊息
                        sayHello(channel);
                    }

                    // 如果是可讀型別的事件,則獲取傳輸過來的資料
                    if (key.isReadable()){
                        readDataFromClient(key);
                    }

                    // 將已經處理的key從集合中刪除
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            iterator.remove();
        }

    }

    /**
     *
     * @param key
     */
    private void readDataFromClient(SelectionKey key) throws IOException {
        // 獲取key管理的Channel物件
        SocketChannel channel = (SocketChannel) key.channel();
        // 讀取之前需要清空緩衝區
        buffer.clear();
        if (channel.read(buffer) < 0){
            channel.close();
        }else {
            buffer.flip();
            String receiveMsg = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
            System.out.println("receive client message: " + receiveMsg + " from " + channel.getRemoteAddress());
        }
    }

    /**
     * 向客戶端傳送響應訊息
     * @param channel
     * @throws IOException
     */
    private void sayHello(SocketChannel channel) throws IOException {
        buffer.clear();
        buffer.put(GREETING.getBytes());
        buffer.flip();
        channel.write(buffer);
    }

    /**
     * 註冊客戶端連線到選擇器上
     * @param selector
     * @param channel
     * @param opRead
     * @throws IOException
     */
    private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException {
        if (channel == null){
            return;
        }
        // 設為非阻塞模式
        channel.configureBlocking(false);
        // 註冊該channel到選擇器上
        channel.register(selector,opRead);
    }

}

通道執行緒

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorThread extends  Thread{

    private Selector selector;

    public SelectorThread(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            // 獲取Selector註冊的通道數
            int n = selector.select();
            while (n > 0){
                // selector.selectedKeys()可以獲取每個註冊通道的key
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if (key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        buffer.flip();
                        String receiveMsg = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                        System.out.println("receive server message: " + receiveMsg + " from " + channel.getRemoteAddress());
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    // 處理下一個事件
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

以上的程式碼演示如何使用NIO的相關API建立TCP伺服器,把上面的程式碼理解到位,對NIO的API就掌握的差不多了。其實,在NIO程式設計中,一個重要的思想是非阻塞模式,選擇器就是這種思想的體現,體會Selector在通道監聽有助於理解非阻塞模式的應用場景。

小結

這樣,我們通過實際的程式碼演示瞭如何使用NIO的相關API實現一個簡單的TCP伺服器,關鍵在於理解NIO模型的原理,非阻塞模式是NIO的思想核心。