1. 程式人生 > >java NIO selector例項

java NIO selector例項

  1. 基礎父類

需要進行selector nio程式設計的朋友,可以直接繼承AbstractSelectorChannel 使用。
ByteBufferUtil在另一篇部落格中有原始碼

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by Thinkpad on 2018/8/28.
 */
public abstract class
AbstractSelectorChannel {
private String adress; private int port; private boolean block; private SelectableChannel selectableChannel; private Selector selector; //停止狀態 private boolean stop; //暫停狀態 private boolean pause; //暫停時間,單位毫秒 private long pauseTime; //讀取字元區
private ByteBuffer readBuff; public AbstractSelectorChannel(String adress ,int port,boolean block){ this.adress = adress; this.port = port; this.block = block; selectableChannel = initSelectableChannel(); } public AbstractSelectorChannel(SelectableChannel selectableChannel){ this
.selectableChannel = selectableChannel; } /** * 初始化SelectableChannel */ public abstract SelectableChannel initSelectableChannel(); /** * 初始化 */ public void init(){ if(readBuff == null){ readBuff = ByteBuffer.allocate(1024); } if(selector == null){ selector = open(0); } if(selectableChannel == null){ selectableChannel = initSelectableChannel(); } } /** * 開啟選擇通道 * ServerSocketChannel只能註冊OP_ACCEPT * DatagramChannel可以註冊OP_READ| OP_WRITE * SocketChannel可以註冊OP_CONNECT|OP_READ| OP_WRITE * @param ops */ public Selector open(int ops){ try { Selector selector = Selector.open(); //如果為0預設取OP_ACCEPT ops = ops == 0 ? SelectionKey.OP_ACCEPT : ops; selectableChannel.register(selector,ops); this.selector = selector; return selector; } catch (IOException e) { e.printStackTrace(); try { selector.close(); } catch (IOException e1) { e1.printStackTrace(); } } return null; } public void select(int pos){ selector = open(pos); select(); } /** * 開始選擇通道 */ public void select(){ init(); try { //Channel必須是非阻塞的。所以FileChannel不適用Selector selectableChannel.configureBlocking(false); while (!isStop()) { isPause(); System.out.println("開始通道選擇"); selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if(key.isConnectable()){ System.out.println("開始連線"); SelectableChannel selectableChannel = connectHandle(key); if(selectableChannel.isOpen()){ selectableChannel.register(selector, SelectionKey.OP_ACCEPT); } }else if (key.isAcceptable()) { System.out.println("開始接收"); SelectableChannel selectableChannel = acceptHandle(key); if(selectableChannel.isOpen()){ selectableChannel.configureBlocking(false); selectableChannel.register(selector, SelectionKey.OP_READ); } }else if (key.isReadable()) { System.out.println("開始讀取"); SelectableChannel selectableChannel = readHandle(key); if(selectableChannel.isOpen()){ key.interestOps(SelectionKey.OP_WRITE); } }else if (key.isWritable()) { System.out.println("開始寫入"); SelectableChannel selectableChannel = writeHandle(key); if(selectableChannel.isOpen()){ key.interestOps(SelectionKey.OP_READ); } } } } } catch (IOException e) { e.printStackTrace(); }finally { try { selector.close(); selectableChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } public void setStop(boolean flag){ this.stop = flag; } private boolean isStop(){ return stop; } private void isPause(){ if(pause){ try { Thread.sleep(pauseTime); } catch (InterruptedException e) { e.printStackTrace(); }finally { pause = false; pauseTime = 0; } } } public void pause(long time){ if(time == 0){ return; } pauseTime = time; pause = true; } /** * 連結處理 * @param selectionKey * @return */ public SelectableChannel connectHandle(SelectionKey selectionKey) { return selectionKey.channel(); } /** * 接收處理 * @param selectionKey * @return */ public SelectableChannel acceptHandle(SelectionKey selectionKey) { return selectionKey.channel(); } /** * 寫入資料 * @return */ public abstract String writeData(); /** * 寫入處理 * @param selectionKey * @return */ public SelectableChannel writeHandle(SelectionKey selectionKey) { WritableByteChannel socketChannel = (WritableByteChannel) selectionKey.channel(); try { ByteBuffer byteBuffer = ByteBufferUtil.write(writeData()); //讀取前指標切換 byteBuffer.flip(); socketChannel.write(byteBuffer); System.out.println("寫入:"+writeData()); } catch (IOException e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } return selectionKey.channel(); } /** * 讀取資料 * @param data */ public abstract void readData(String data); /** * 讀取處理 * @param selectionKey * @return */ public SelectableChannel readHandle(SelectionKey selectionKey) { ReadableByteChannel socketChannel = (ReadableByteChannel) selectionKey.channel(); try { socketChannel.read(readBuff); String data = ByteBufferUtil.readString(readBuff); readData(data); System.out.println("received : " + data); } catch (IOException e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } return selectionKey.channel(); } public String getAdress() { return adress; } public int getPort() { return port; } public boolean isBlock() { return block; } public SelectableChannel getSelectableChannel() { return selectableChannel; } public Selector getSelector() { return selector; } }
  1. 服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by Thinkpad on 2018/8/27.
 */
public class SelectorService extends AbstractSelectorChannel{

    public SelectorService(String adress ,int port,boolean block){
        super(adress,port,block);
    }

    @Override
    public SelectableChannel initSelectableChannel() {
        ServerSocketChannel ssc = null;
        try {
            ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(getAdress(),getPort()));
            ssc.configureBlocking(isBlock());
            return ssc;
        } catch (IOException e) {
            e.printStackTrace();
            if(ssc != null){
                try {
                    ssc.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return null;
    }

    @Override
    public SelectableChannel acceptHandle(SelectionKey selectionKey) {
        // 建立新的連線,並且把連線註冊到selector上,而且,
        // 宣告這個channel只對讀操作感興趣。
        ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();
        SocketChannel socketChannel = null;
        try {
            socketChannel = ssc.accept();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return socketChannel;
    }

    @Override
    public String writeData() {
        return "你好1";
    }


    @Override
    public void readData(String data) {
        //讀取資料後操作
    }

    public static void main(String[] args) {
        SelectorService selectorService = new SelectorService("127.0.0.1",8000,false);
        selectorService.select();
    }
}
  1. 客服端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 * Created by Thinkpad on 2018/8/27.
 */
public class SelectorClient extends AbstractSelectorChannel{

    public SelectorClient(String adress ,int port,boolean block){
        super(adress,port,block);
    }

    public static void main(String[] args) throws IOException {
        SelectorClient selectorClient = new SelectorClient("127.0.0.1",8000,false);
        selectorClient.select(SelectionKey.OP_WRITE);
    }

    @Override
    public SelectableChannel initSelectableChannel() {
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
            socketChannel.configureBlocking(isBlock());
            return socketChannel;
        } catch (IOException e) {
            e.printStackTrace();
            if(socketChannel != null){
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        ;return null;
    }

    @Override
    public String writeData() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "你好,我是客戶端";
    }

    @Override
    public void readData(String data) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("客戶端讀取到資料:"+data);
    }
}