Java 標準IO框架與NIO框架詳解
阿新 • • 發佈:2019-01-04
在看這篇文章之前,可以先去看看我部落格中另一篇關於同步與非同步、阻塞與非阻塞的理解
Java標準IO(BIO)
BIO全稱Blocking IO又叫做同步阻塞IO,它存在如下特點:
- 面向流
- 同步
- 阻塞
package com.xdong.bio.client; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class BioClient { public static void main(String[] args) { Socket client = null; InputStream inputStream = null; OutputStream outputStream = null; try { client = new Socket("127.0.0.1", 8888); inputStream = client.getInputStream(); outputStream = client.getOutputStream(); outputStream.write("Hello Server I am conneting you!".getBytes()); outputStream.flush(); byte[] bytes = new byte[1024]; while (inputStream.read(bytes) > 0) {//這裡必須一次讀完才能進行下次讀操作,否則下一次會將上一次的 String msg = new String(bytes).trim(); System.out.println("client receive the message of server:" + msg); outputStream.write("client get the message !".getBytes()); outputStream.flush(); Thread.sleep(1*1000); } } catch (Exception e) { // TODO: handle exception } } }
package com.xdong.bio.server; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BioServer { private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(5); public static void main(String[] args) { ServerSocket server = null; Socket socket = null; try { server = new ServerSocket(8888); while (!Thread.interrupted()) { socket = server.accept(); THREAD_POOL.submit(new SocketHandler(socket)); } } catch (Exception e) { // TODO: handle exception } } static class SocketHandler implements Runnable{ private Socket socket; public SocketHandler(Socket socket) { super(); this.socket = socket; } @Override public void run() { InputStream inputStream = null; OutputStream outputStream = null; try { inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); byte[] bs = new byte[1024]; while (true) { int available = inputStream.available();// 輸入流中有多少可用位元組 inputStream.read(bs, 0, available);// 從輸出流中將資料寫入位元組陣列(注:如果位元組陣列原本有100個位元組,本次讀取50個位元組,那麼它只會覆蓋前面50個位元組資料,位元組陣列中還是有100個位元組資料) String receive = new String(bs, 0,available).trim(); System.out.println("server receive the message of client:" + receive); outputStream.write("server get the message!".getBytes());//write函式-阻塞 outputStream.flush(); Thread.sleep(1*1000); } } catch (Exception e) { e.printStackTrace(); } } } }
簡單分析一下上面程式碼:
- 繫結一個埠,產生一個SocketServer物件,並呼叫accept()方法,監聽這個埠,接收客戶端連線。
- 當有客戶端接入後,服務端單獨起一個執行緒,利用socket與客戶端互動。
- 利用socket.getInputStream流接收客戶端傳送的訊息,利用socket.getOutputStream流傳送訊息給客戶端。
- 注意:server.accept、inpustream.read、outputstream.write都會阻塞執行緒。
- 同步體現在服務端與客戶端的互動只能一步步順序進行。阻塞主要體現在第4點中的幾個方法。即使採用了多執行緒技術,實現了主執行緒非阻塞,但是子執行緒中與客戶端互動依舊是阻塞的。
Java NIO框架
NIO,全稱是Non-Blocking IO或New IO,解釋就是非阻塞IO和新版IO。下面將給出三份不同的Server端程式碼,分別表示為:單Selector-單執行緒模式、單Selector-多執行緒模式、多Selector-多執行緒模式。
NIO的特性如下:
- 面向緩衝區
- 同步
- 非阻塞
package com.xdong.nio.v1.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* v1=單執行緒、單reactor
* @author xiongchaoqun
* @date 2018年7月2日
*/
public class NioServer {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
try {
serverSocketChannel = ServerSocketChannel.open();// 例項化
serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
Selector selector = Selector.open();// 例項化一個Selector
serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
if (key.isAcceptable()) {
handleAcceptEvent(selector,key);
} else if (key.isReadable()) {
handleReadEvent(selector,key);
}
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 處理客戶端接入事件
*
* @param key
* @throws IOException
*/
private static void handleAcceptEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = null;// 客戶端
ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
try {
client = server.accept();// 客戶端接入產生一個SocketChannel
System.out.println("server receive a connet request of client!");
if (client == null) {
return;
}
client.configureBlocking(false);// 設定客戶端通道為非阻塞
client.register(selector, SelectionKey.OP_READ);//一般不會主動設定SelectionKey.OP_WRITE,因為緩衝區會一直處於可寫狀態,無限觸發select()
} catch (Exception e) {
server.close();
}
}
/**
* 處理讀監聽事件
* @param key
* @throws IOException
*/
private static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer);
byte[] data = readBuffer.array();
String msg = new String(data).trim();//消除空格
System.out.println("server receive the client msg:" + msg);
String outMsg = "I am the server!";
ByteBuffer writeBuffer = ByteBuffer.wrap(outMsg.getBytes());
client.write(writeBuffer);
}
}
package com.xdong.nio.v2.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* v2=執行緒池、單reactor
* @author xiongchaoqun
* @date 2018年7月2日
*/
public class NioServer {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
try {
serverSocketChannel = ServerSocketChannel.open();// 例項化
serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
Selector selector = Selector.open();// 例項化一個Selector
serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
if (key.isAcceptable()) {
handleAcceptEvent(selector,key);
} else if (key.isReadable()) {
new Processor().process(key);//使用執行緒池處理
}
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 處理客戶端接入事件
*
* @param key
* @throws IOException
*/
private static void handleAcceptEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = null;// 客戶端
ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
try {
client = server.accept();// 客戶端接入產生一個SocketChannel
System.out.println("server receive a connet request of client!");
if (client == null) {
return;
}
client.configureBlocking(false);// 設定客戶端通道為非阻塞
client.register(selector, SelectionKey.OP_READ);//一般不會主動設定SelectionKey.OP_WRITE,因為緩衝區會一直處於可寫狀態,無限觸發select()
} catch (Exception e) {
server.close();
}
}
/**
* 處理讀監聽事件
* @param key
* @throws IOException
*/
private static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer);
byte[] data = readBuffer.array();
String msg = new String(data).trim();//消除空格
System.out.println("server receive the client msg:" + msg);
String outMsg = "I am the server!";
ByteBuffer writeBuffer = ByteBuffer.wrap(outMsg.getBytes());
client.write(writeBuffer);
}
}
package com.xdong.nio.v3.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* v3=執行緒池+雙reactor(雙Selector)
* @author xiongchaoqun
* @date 2018年7月2日
*/
public class NioServer {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
try {
serverSocketChannel = ServerSocketChannel.open();// 例項化
serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
Selector selector = Selector.open();// 例項化一個Selector
serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
int coreNum = 2;
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int acceptNum = 0;
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
if (key.isAcceptable()) {
acceptNum++;
SocketChannel channel = null;// 客戶端
ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
channel = server.accept();// 客戶端接入產生一個SocketChannel
System.out.println("server receive a connet request of client!");
Processor processor = processors[acceptNum%coreNum];
System.out.println(1);
processor.addChannel(channel);//將SocketChannel交給另一個Selector去處理
System.out.println(142);
processor.wakeup();
}
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.xdong.nio.v2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* v2=執行緒池、單reactor
* @author xiongchaoqun
* @date 2018年7月2日
*/
public class NioClient {
public static void main(String[] args) throws Exception{
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
Selector selector = Selector.open();
socketChannel.configureBlocking(false);//設定非阻塞,這樣才能使用Selector
socketChannel.connect(new InetSocketAddress(1234));//客戶端連線--這裡其實客戶端並沒有完成連線,是在channel.finishConnect();才能完成連線
socketChannel.register(selector, SelectionKey.OP_CONNECT);//註冊可連線事件
while (selector.select() > 0) {
Iterator<SelectionKey>it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
if (key.isConnectable()) {
handleConnectEvent(selector, key);
}else if(key.isReadable()) {
handleReadEvent(selector, key);
}
Thread.sleep(1000);
}
}
}
public static void handleConnectEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
if (client.isConnectionPending()) {
client.finishConnect();
}
client.configureBlocking(false);//設定通道為非阻塞
client.write(ByteBuffer.wrap(new String("I want to connet to the server!").getBytes()));//服務端 讀操作會讀到
client.register(selector, SelectionKey.OP_READ);//註冊可讀時間
}
public static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);//宣告一個bytebuffer
client.read(buffer);
byte[] bytes = buffer.array();
String getMsg = new String(bytes).trim();
System.out.println("client receive server msg is "+getMsg);
client.write(ByteBuffer.wrap(new String("I am a client!").getBytes()));
}
}
Demo專案程式碼已上傳至我的Git上,有興趣的可以去fork一下。