java NIO selector例項
阿新 • • 發佈:2018-11-27
- 基礎父類
需要進行selector nio程式設計的朋友,可以直接繼承AbstractSelectorChannel 使用。
ByteBufferUtil在另一篇部落格中有原始碼
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* Created by Thinkpad on 2018/8/28.
*/
public abstract class AbstractSelectorChannel {
private String adress;
private int port;
private boolean block;
private SelectableChannel selectableChannel;
private Selector selector;
//停止狀態
private boolean stop;
//暫停狀態
private boolean pause;
//暫停時間,單位毫秒
private long pauseTime;
//讀取字元區
private ByteBuffer readBuff;
public AbstractSelectorChannel(String adress ,int port,boolean block){
this.adress = adress;
this.port = port;
this.block = block;
selectableChannel = initSelectableChannel();
}
public AbstractSelectorChannel(SelectableChannel selectableChannel){
this .selectableChannel = selectableChannel;
}
/**
* 初始化SelectableChannel
*/
public abstract SelectableChannel initSelectableChannel();
/**
* 初始化
*/
public void init(){
if(readBuff == null){
readBuff = ByteBuffer.allocate(1024);
}
if(selector == null){
selector = open(0);
}
if(selectableChannel == null){
selectableChannel = initSelectableChannel();
}
}
/**
* 開啟選擇通道
* ServerSocketChannel只能註冊OP_ACCEPT
* DatagramChannel可以註冊OP_READ| OP_WRITE
* SocketChannel可以註冊OP_CONNECT|OP_READ| OP_WRITE
* @param ops
*/
public Selector open(int ops){
try {
Selector selector = Selector.open();
//如果為0預設取OP_ACCEPT
ops = ops == 0 ? SelectionKey.OP_ACCEPT : ops;
selectableChannel.register(selector,ops);
this.selector = selector;
return selector;
} catch (IOException e) {
e.printStackTrace();
try {
selector.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
return null;
}
public void select(int pos){
selector = open(pos);
select();
}
/**
* 開始選擇通道
*/
public void select(){
init();
try {
//Channel必須是非阻塞的。所以FileChannel不適用Selector
selectableChannel.configureBlocking(false);
while (!isStop()) {
isPause();
System.out.println("開始通道選擇");
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if(key.isConnectable()){
System.out.println("開始連線");
SelectableChannel selectableChannel = connectHandle(key);
if(selectableChannel.isOpen()){
selectableChannel.register(selector, SelectionKey.OP_ACCEPT);
}
}else if (key.isAcceptable()) {
System.out.println("開始接收");
SelectableChannel selectableChannel = acceptHandle(key);
if(selectableChannel.isOpen()){
selectableChannel.configureBlocking(false);
selectableChannel.register(selector, SelectionKey.OP_READ);
}
}else if (key.isReadable()) {
System.out.println("開始讀取");
SelectableChannel selectableChannel = readHandle(key);
if(selectableChannel.isOpen()){
key.interestOps(SelectionKey.OP_WRITE);
}
}else if (key.isWritable()) {
System.out.println("開始寫入");
SelectableChannel selectableChannel = writeHandle(key);
if(selectableChannel.isOpen()){
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
selector.close();
selectableChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void setStop(boolean flag){
this.stop = flag;
}
private boolean isStop(){
return stop;
}
private void isPause(){
if(pause){
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
pause = false;
pauseTime = 0;
}
}
}
public void pause(long time){
if(time == 0){
return;
}
pauseTime = time;
pause = true;
}
/**
* 連結處理
* @param selectionKey
* @return
*/
public SelectableChannel connectHandle(SelectionKey selectionKey) {
return selectionKey.channel();
}
/**
* 接收處理
* @param selectionKey
* @return
*/
public SelectableChannel acceptHandle(SelectionKey selectionKey) {
return selectionKey.channel();
}
/**
* 寫入資料
* @return
*/
public abstract String writeData();
/**
* 寫入處理
* @param selectionKey
* @return
*/
public SelectableChannel writeHandle(SelectionKey selectionKey) {
WritableByteChannel socketChannel = (WritableByteChannel) selectionKey.channel();
try {
ByteBuffer byteBuffer = ByteBufferUtil.write(writeData());
//讀取前指標切換
byteBuffer.flip();
socketChannel.write(byteBuffer);
System.out.println("寫入:"+writeData());
} catch (IOException e) {
e.printStackTrace();
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
return selectionKey.channel();
}
/**
* 讀取資料
* @param data
*/
public abstract void readData(String data);
/**
* 讀取處理
* @param selectionKey
* @return
*/
public SelectableChannel readHandle(SelectionKey selectionKey) {
ReadableByteChannel socketChannel = (ReadableByteChannel) selectionKey.channel();
try {
socketChannel.read(readBuff);
String data = ByteBufferUtil.readString(readBuff);
readData(data);
System.out.println("received : " + data);
} catch (IOException e) {
e.printStackTrace();
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
return selectionKey.channel();
}
public String getAdress() {
return adress;
}
public int getPort() {
return port;
}
public boolean isBlock() {
return block;
}
public SelectableChannel getSelectableChannel() {
return selectableChannel;
}
public Selector getSelector() {
return selector;
}
}
- 服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* Created by Thinkpad on 2018/8/27.
*/
public class SelectorService extends AbstractSelectorChannel{
public SelectorService(String adress ,int port,boolean block){
super(adress,port,block);
}
@Override
public SelectableChannel initSelectableChannel() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(getAdress(),getPort()));
ssc.configureBlocking(isBlock());
return ssc;
} catch (IOException e) {
e.printStackTrace();
if(ssc != null){
try {
ssc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
@Override
public SelectableChannel acceptHandle(SelectionKey selectionKey) {
// 建立新的連線,並且把連線註冊到selector上,而且,
// 宣告這個channel只對讀操作感興趣。
ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();
SocketChannel socketChannel = null;
try {
socketChannel = ssc.accept();
} catch (IOException e) {
e.printStackTrace();
}
return socketChannel;
}
@Override
public String writeData() {
return "你好1";
}
@Override
public void readData(String data) {
//讀取資料後操作
}
public static void main(String[] args) {
SelectorService selectorService = new SelectorService("127.0.0.1",8000,false);
selectorService.select();
}
}
- 客服端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
* Created by Thinkpad on 2018/8/27.
*/
public class SelectorClient extends AbstractSelectorChannel{
public SelectorClient(String adress ,int port,boolean block){
super(adress,port,block);
}
public static void main(String[] args) throws IOException {
SelectorClient selectorClient = new SelectorClient("127.0.0.1",8000,false);
selectorClient.select(SelectionKey.OP_WRITE);
}
@Override
public SelectableChannel initSelectableChannel() {
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
socketChannel.configureBlocking(isBlock());
return socketChannel;
} catch (IOException e) {
e.printStackTrace();
if(socketChannel != null){
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
;return null;
}
@Override
public String writeData() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "你好,我是客戶端";
}
@Override
public void readData(String data) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("客戶端讀取到資料:"+data);
}
}