1. 程式人生 > >Java NIO簡單例項(入門)

Java NIO簡單例項(入門)

 最近學習了一下Java NIO的開發,剛開始接觸selector,覺得有點繞,弄的有點暈,所以在這裡寫幾個簡單的例子,記錄一下,也與大家分享一下,對剛開始學習NIO的同學,希望有一些幫忙。大神就請多指點了。
 開發穩定NIO對工程師的要求很高,NIO本身也存在很多的BUG,本文的例子只簡單的幫助簡單NIO的一些概念,對於一些例如TCP粘包/拆包等問題,不予以考慮。
 對於NIO的一些概念什麼的,就不在這裡闡述了,給大家推薦幾個文章,可以參考一下,裡面有詳細的解釋。
 Java NIO基本概念:
     (1) http://ifeve.com/java-nio-all/
     (2)

http://blog.csdn.net/jeffleo/article/details/54695959?locationNum=6
 Java NIO開發的注意事項:
     (1)http://blog.csdn.net/martin_liang/article/details/41224503

例項一:不使用selector實現埠通訊

import org.apache.log4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;

public
class SocketTimePrint { private static final Logger logger = Logger.getLogger(SocketTimePrint.class); public static void main(String[] args) throws Exception{ Thread server = new Thread(new ServerMsgNoselector()); Thread client = new Thread(new ClientMsg()); logger.info("Beginning to start the server for message"
); server.start(); Thread.sleep(100); logger.info("Beginning to start the client for message"); client.start(); } } class ClientMsg implements Runnable { private static final Logger logger = Logger.getLogger(ClientMsg.class); @Override public void run() { try { SocketChannel socket = SocketChannel.open(); socket.connect(new InetSocketAddress("127.0.0.1", 9999)); socket.configureBlocking(false); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (true) { String datestr = format.format(new Date()); ByteBuffer buff = ByteBuffer.wrap(datestr.getBytes()); socket.write(buff); Thread.sleep(2000); } } catch (IOException | InterruptedException ie) { ie.printStackTrace(); } } } class ServerMsgNoselector implements Runnable { private static final Logger logger = Logger.getLogger(ServerMsgNoselector.class); @Override public void run() { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(9999)); serverChannel.configureBlocking(true); SocketChannel client = serverChannel.accept(); client.configureBlocking(true); ByteBuffer buffer = ByteBuffer.allocate(1024); while(true) { int len = client.read(buffer); if (len > 0 && buffer.hasArray()) { String msg = new String(buffer.array(), 0, len); logger.info("The recvicing message is " + msg); } else { logger.info("Waiting for message ..."); } buffer.clear(); } }catch(Exception ie) { ie.printStackTrace(); } } }

 例子很簡單,分別在兩個程序中啟動一個client與一個server,通過9999埠進行通訊。服務端的channel都設定為了阻塞模式。該例子中,只有一個客戶端與一個server,而當在實際應用時,client會有很多連線,我可能也不只監聽一個埠,這時就會出現各種問題(可能看一下《Netty權威指南》裡面對IO的基本概念及Java IO的演進有一個大致的介紹),而selector就是為了解決這些問題,以達到更高的效能。

例項二:基於selector實現的通訊

import org.apache.log4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

public class SocketSCMode {
    private static final Logger logger = Logger.getLogger(SocketSCMode.class);

    public static void main(String[] args) throws Exception{
        SocketSCMode leo = new SocketSCMode();
        leo.exe();
    }

    public void exe() throws InterruptedException{
        Thread server = new Thread(new ServerMsg());
        Thread client = new Thread(new ClientMsg());

        logger.info("Beginning to start the server for message");
        server.start();
        Thread.sleep(100);
        logger.info("Beginning to start the client for message");
        client.start();
    }

    class ClientMsg implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel socket = SocketChannel.open();
                socket.connect(new InetSocketAddress("127.0.0.1", 9999));
                socket.configureBlocking(false);
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                while (true) {
                    String datestr = format.format(new Date());
                    ByteBuffer buff = ByteBuffer.wrap(datestr.getBytes());
                    //logger.info("Sending the message " + datestr);
                    socket.write(buff);
                    Thread.sleep(2000);
                }
            } catch (IOException | InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }

    class ServerMsg implements Runnable {
        @Override
        public void run() {
            logger.info("Enter the server thread");
            try {
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                ServerSocket socket = serverChannel.socket();
                socket.bind(new InetSocketAddress(9999));
                serverChannel.configureBlocking(false);
                Selector selector = Selector.open();
                SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                long flag = 1L;

                while (true) {
                    int count = selector.select();
                    if (count > 0) {
                        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            logger.info("Flag-->" + flag + "   The interest is " + key.interestOps());
                            if (key.isAcceptable()) {
                                logger.info("Flag-->" + flag + "   The server accept one requesting");
                                iter.remove();
                                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                                SocketChannel client = server.accept();
                                client.configureBlocking(false);
                                client.register(selector, SelectionKey.OP_READ);
                            } else if (key.isReadable()) {
                                iter.remove();
                                logger.info("Flag-->" + flag + "   Reading the mesage");
                                SocketChannel client = (SocketChannel) key.channel();
                                ByteBuffer buff = ByteBuffer.allocate(1024);
                                int len = client.read(buff);
                                if (len > 0 && buff.hasArray()) {
                                    byte[] arr = buff.array();
                                    String msg = new String(arr, 0, len);
                                    logger.info("Flag-->" + flag + "   The recevied message is " + msg);
                                }
                            }
                        }
                    }
                    flag++;
                }
            } catch (IOException ie) {
                ie.printStackTrace();
            }
        }
    }
}

 邏輯上與第一個例子完全一樣,要注意的是iter.remove()這個操作。
 在該例子中,把if (key.isAcceptable())下的remove操作去掉,會報一個空指標異常。過程如下:
 1、把ServerSocketChannel註冊到selector上,啟動client
 2、第一次select,發現ServerSocketChannel是acceptable的,即有已選擇的鍵(不然程序會阻塞在select方法上),因些,返回遍歷SelectionKey的iter(已選擇鍵的集合)
 3、進入if (key.isAcceptable())下的程式碼塊,通過accept返回一個SocketChannel與client進行通訊,並把這個SocketChannel註冊到selector上。遍歷結束
 4、此時server端接收了client的連線,client開發傳送資料。server再執行select方法,發現SocketChannel是Readable,再次返回已選擇鍵的集合
 5、由於第一次遍歷,ServerSocketChannel沒有從已選擇鍵的集合去掉,並且,selector不會更新已選擇鍵的集合中鍵的read集合,因此,遍歷iter時,會再次進入if (key.isAcceptable()),而此時,沒有client進行連線請求,因些,accept方法返回的是一下null,也就出現了空指標異常。

 如果我們去掉程式中的第二個remove會發生什麼?程序會一直阻塞在select方法中,因為select方法是增量的,已在已選擇鍵的集合中的channel,不會去再處理。也就是說,select方法,只有當已選擇鍵的集合有新元素新增時才會返回(這麼理解有點不準確。。。)

例項三:多人聊天室

 這個例子是參照別人的程式碼寫的,當然,沒人家寫的好,修修補補的,總算能按大概的意思運行了,提供給大家做反面教材吧(-_-!!!)

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.*;

public class MultiplayerChat {

    static {
        Logger.getLogger(ServerMsgRoom.class).setLevel(Level.ERROR);
        Logger.getLogger(ClientMsgRoom.class).setLevel(Level.INFO);
    }

    public static void main(String[] args) throws Exception{
        Thread client1 = new Thread(new ClientMsgRoom(true), "leo");
        Thread client2 = new Thread(new ClientMsgRoom(true), "Makukin");
        Thread client3 = new Thread(new ClientMsgRoom(true), "Forrestleo");

        Thread server = new Thread(new ServerMsgRoom(), "ServerRoom");

        System.out.println("Start server ...");
        server.start();
        Thread.sleep(100);
        System.out.println("Start client");
        client1.start();
        client2.start();
        client3.start();
        server.join();
    }

    public static boolean canReadable(SocketChannel channel) {
        return (channel.validOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ;
    }

    public static boolean canWriteable(SocketChannel channel) {
        return (channel.validOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
    }
}

class ClientMsgRoom implements Runnable{
    private static final Logger logger = Logger.getLogger(ClientMsgRoom.class);
    private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");

    private String name;
    private String HOST;
    private int PORT;
    private ByteBuffer buff;
    private boolean pringFlag;

    public ClientMsgRoom(boolean flag) {
        HOST = "127.0.0.1";
        PORT = 9999;
        buff = ByteBuffer.allocate(1024);
        pringFlag = flag;
    }

    @Override
    public void run() {
        name = Thread.currentThread().getName();
        try {
            SocketChannel client = SocketChannel.open();
            client.connect(new InetSocketAddress(HOST, PORT));
            client.configureBlocking(false);

            String headMsg = "Register : " + name + "\n";
            logger.info(headMsg);
            buff.put(headMsg.getBytes());
            buff.flip();
            while(buff.hasRemaining()) {
                client.write(buff);
            }
            logger.info("Register the client ... ");
            while(true) {
                boolean flag = false;
                if(MultiplayerChat.canReadable(client)){
                    int len = 0;
                    buff.clear();
                    while((len = client.read(buff)) > 0) {
                        String msg = new String(buff.array(), 0, len);
                        if(pringFlag)
                            System.out.println(name + "-->\n" + msg);
                        flag = true;
                    }
                }

                if(flag && MultiplayerChat.canWriteable(client)) {
                    String reMsg = name + "'s time is " + format.format(new Date()) + "\n";
                    //logger.info("Sending message  " + reMsg);
                    buff.clear();
                    buff.put(reMsg.getBytes());
                    buff.flip();
                    while(buff.hasRemaining()) {
                        client.write(buff);
                    }
                }
                Thread.sleep(5000);
            }
        }catch(IOException | InterruptedException ie) {
            ie.printStackTrace();
        }
    }
}

class ServerMsgRoom implements Runnable {
    private static final Logger logger = Logger.getLogger(ServerMsgRoom.class);

    private int PORT;
    private ByteBuffer buff;
    private Vector<SelectionKey> chaters;

    public ServerMsgRoom() {
        PORT = 9999;
        buff = ByteBuffer.allocate(1024);
        chaters = new Vector<>();
    }

    @Override
    public void run() {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.socket().bind(new InetSocketAddress(PORT));
            Selector selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);

            while(true) {
                int count = selector.select();
                if(count <= 0)
                    continue;

                logger.info("Begin to ack the keys and iter's size is " + count);
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while(iter.hasNext()) {
                    SelectionKey key = iter.next();
                    if(key.isAcceptable()) {
                        ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
                        SocketChannel socket = serverChannel.accept();
                        logger.info("Add one register");
                        socket.configureBlocking(false);
                        SelectionKey rkey = socket.register(selector, SelectionKey.OP_READ);
                        chaters.add(rkey);
                    }else if(key.isValid() && key.isReadable()) {
                        SocketChannel clientChannel = (SocketChannel)key.channel();
                        int len = 0;
                        StringBuilder builder = new StringBuilder();
                        buff.clear();
                        while((len = clientChannel.read(buff)) > 0) {
                            String msg = new String(buff.array(), 0 , len);
                            builder.append(msg);
                        }
                        logger.info("PrintMessage " + builder.toString());
                        Iterator<SelectionKey> iterWriter = chaters.iterator();
                        while(iterWriter.hasNext()) {
                            SelectionKey keyWriter = iterWriter.next();
                            List<String> list = (List<String>) keyWriter.attachment();
                            if (list == null) {
                                list = new ArrayList<>();
                                keyWriter.attach(list);
                            }
                            list.add(builder.toString());
                        }
                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                    }else if(key.isValid() && key.isWritable()) {
                        List<String> list = (List<String>) key.attachment();
                        SocketChannel writer = (SocketChannel) key.channel();
                        if(list != null) {
                            logger.info("The attaching list is " + list.toString());
                            Iterator<String> iterReader = list.iterator();
                            while(iterReader.hasNext()) {
                                String message = iterReader.next();
                                buff.clear();
                                buff.put(message.getBytes());
                                buff.flip();
                                while(buff.hasRemaining()) {
                                    writer.write(buff);
                                }
                                logger.info("Writing message is " + message);
                                iterReader.remove();
                            }
                        }else{
                            logger.warn("The list is null");
                        }
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    iter.remove();
                }
            }
        }catch(IOException ie) {
            ie.printStackTrace();
        }

    }
}