java NIO模擬 Netty執行緒模型
阿新 • • 發佈:2018-12-21
兩個類,NioAcceptor,處理連線 單執行緒。NioReactor處理讀寫,多執行緒。
解釋放在程式碼中
NioAcceptor
package com.zwj.myNio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Set; public class NioAcceptor { private volatile Selector selector; private final ServerSocketChannel serverSocketChannel; private String iP; private int port; // 對於以字元方式讀取和處理的資料必須要進行字符集編碼和解碼 String encoding = System.getProperty("file.encoding"); // 載入位元組編碼集 Charset charse = Charset.forName(encoding); private final NioReactor[] nioReactors; private volatile int nextReactor; public NioAcceptor(String ip, int port, int threadN) { if (ip == null || ip == "") { throw new IllegalArgumentException(); } this.iP = ip; if (port < 1) { throw new IllegalArgumentException(); } this.port = port; if (threadN < 1) { throw new IllegalArgumentException(); } //新建處理讀寫的執行緒池 nioReactors = new NioReactor[threadN]; for (int i = 0; i < threadN; i++) { nioReactors[i] = new NioReactor(); nioReactors[i].start(); } try { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); this.serverSocketChannel.configureBlocking(false); /** 設定TCP屬性 */ serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2); // backlog=100 serverSocketChannel.bind(new InetSocketAddress(ip, port), 100); this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server start!"); } catch (IOException e) { e.printStackTrace(); throw new IllegalArgumentException(); } } private NioReactor nextNioReactor() { int i = nextReactor++; if (i >= nioReactors.length) { i = nextReactor = 0; } return nioReactors[i]; } public void run() { while (true) { try { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); if (!keys.isEmpty()) { for (SelectionKey key : keys) { keys.remove(key); if (key.isValid() && key.isAcceptable()) { SocketChannel ch = serverSocketChannel.accept(); ch.configureBlocking(false); String tomessage = "welcome,this is server!"; try { ch.write(charse.encode(tomessage)); } catch (IOException e) { e.printStackTrace(); } //把這個通道交給Reactor處理,獲取NioReactor的方式非常原始就是輪詢這個陣列,從0開始那,到了末尾就又從0開始 NioReactor nioReactor = nextNioReactor(); nioReactor.postRegister(ch); } else { //acceptor 只接受 accept事件,如果有其他事件 把這個通道從selector 移除 key.cancel(); } } } } catch (IOException e) { e.printStackTrace(); } } } }
NioReactor 內部的worker 是一個 runnable 不斷的select 檢查就緒的讀事件。同時有一儲存了該worker需要處理通道的佇列。詳細解釋看程式碼註釋
package com.zwj.myNio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; public class NioReactor { final String encoding = System.getProperty("file.encoding"); final Charset charse = Charset.forName(encoding); private final Worker worker; public NioReactor() { //新建一個worker,worker 是一個 任務,實現 了Runnable介面他的功能就是不斷的輪訓註冊在他的selector 上的通過(channel)就緒讀事件 this.worker = new Worker(); } public void postRegister(SocketChannel socketChannel) { //把 通道放到worker 的register佇列上面,每次worker輪訓的時候會去檢查這個佇列,如果有新的通道,就把通道註冊到自己的selector this.worker.registerQueue.add(socketChannel); this.worker.selector.wakeup(); } public void start() { //啟動一個執行緒來執行worker new Thread(worker).start(); } private class Worker implements Runnable { private volatile Selector selector; private ConcurrentLinkedQueue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); public Worker() { try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } private void register(Selector selector) { if (registerQueue.isEmpty()) { return; } SocketChannel socketChannel = null; while ((socketChannel = registerQueue.poll()) != null) { try { socketChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } } @Override public void run() { while (true) { try { register(selector); selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { keys.remove(key); if (key.isValid() && key.isReadable()) { SocketChannel rchannel = null; try { rchannel = (SocketChannel) key.channel(); ByteBuffer readByteBuffer = ByteBuffer.allocate(2048); String content = ""; rchannel.read(readByteBuffer); readByteBuffer.flip(); content += charse.decode(readByteBuffer); //to do business System.out.println("server rec:" + content); String tomessage = "this is server!i have rec you mess"; rchannel.write(charse.encode(tomessage)); } catch (IOException e) { if (rchannel != null) { key.cancel(); rchannel.close(); } } } } } catch (Exception e) { e.printStackTrace(); } } } } }
最後一個 客戶端程式 來連線 服務
package com.zwj.myNio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
public class NIOClient {
private static final int SIZE = 1024;
private static NIOClient instance = new NIOClient();
public String IP = "127.0.0.1";// 10.50.200.120
public int CLIENT_PORT = 8090;// 4444 9666
private SocketChannel channel;
private Selector selector = null;
String encoding = System.getProperty("file.encoding");
Charset charset = Charset.forName(encoding);
private NIOClient() {
}
public static NIOClient getInstance() {
return instance;
}
public void send(String content) throws IOException {
selector = Selector.open();
channel = SocketChannel.open();
// channel = SocketChannel.open(new InetSocketAddress(IP,CLIENT_PORT));
InetSocketAddress remote = new InetSocketAddress(IP, CLIENT_PORT);
channel.connect(remote);
// 設定該sc以非阻塞的方式工作
channel.configureBlocking(false);
// 將SocketChannel物件註冊到指定的Selector
// SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
channel.register(selector, SelectionKey.OP_READ);//這裡註冊的是read讀,即從服務端讀資料過來
// 啟動讀取伺服器資料端的執行緒
new ClientThread().start();
channel.write(charset.encode(content));
// 建立鍵盤輸入流
Scanner scan = new Scanner(System.in);//這裡向服務端傳送資料,同時啟動了一個鍵盤監聽器
while (scan.hasNextLine()) {
System.out.println("輸入資料:\n");
// 讀取鍵盤的輸入
String line = scan.nextLine();
// 將鍵盤的內容輸出到SocketChanenel中
channel.write(charset.encode(line));
}
scan.close();
}
/**
* 從服務端讀入資料的執行緒
*
* @author 王俊偉 [email protected]
* @date 2016年10月20日 下午9:59:11
*/
private class ClientThread extends Thread {
@Override
public void run() {
try {
while (selector.select() > 0) {
// 遍歷每個有可能的IO操作的Channel對銀行的SelectionKey
for (SelectionKey sk : selector.selectedKeys()) {
// 刪除正在處理的SelectionKey
selector.selectedKeys().remove(sk);
// 如果該SelectionKey對應的Channel中有可讀的資料
if (sk.isReadable()) {
// 使用NIO讀取Channel中的資料
SocketChannel sc = (SocketChannel) sk.channel();
String content = "";
ByteBuffer bff = ByteBuffer.allocate(SIZE);
while (sc.read(bff) > 0) {
sc.read(bff);
bff.flip();
content += charset.decode(bff);
}
// 列印讀取的內容
System.out.println("服務端返回資料:" + content);
// 處理下一次讀
sk.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException io) {
io.printStackTrace();
}
}
}
/**
* 接受服務端的資料
*
* @param channel
* @return
* @throws Exception
*/
protected void receiveData(SocketChannel channel) throws Exception {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
int count = 0;
while ((count = channel.read(buffer)) != -1) {
if (count == 0) {
Thread.sleep(100); // 等等一下
continue;
}
// 轉到最開始
buffer.flip();
while (buffer.remaining() > 0) {
System.out.print((char) buffer.get());
}
buffer.clear();
}
}
public static void main(String[] args) {
try {
NIOClient nio = new NIOClient();
nio.send("test");//向服務端傳送資料
//nio.send("metrics:memory: swap: cpu: network i/o: disks i/o: tcp:\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務啟動的main 程式
package com.zwj.myNio;
public class MyNio {
public static void main(String[] args) {
// write your code here
NioAcceptor nioAcceptor = new NioAcceptor("127.0.0.1", 8090, 1);
nioAcceptor.run();
}
}