1. 程式人生 > >Java 標準IO框架與NIO框架詳解

Java 標準IO框架與NIO框架詳解

    在看這篇文章之前,可以先去看看我部落格中另一篇關於同步與非同步、阻塞與非阻塞的理解

Java標準IO(BIO)

    BIO全稱Blocking IO又叫做同步阻塞IO,它存在如下特點:

  • 面向流
  • 同步
  • 阻塞
package com.xdong.bio.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class BioClient {
	
	public static void main(String[] args) {
		Socket client = null;
		InputStream inputStream = null;
		OutputStream outputStream = null;
		try {
			client = new Socket("127.0.0.1", 8888);
			inputStream = client.getInputStream();
			outputStream = client.getOutputStream();
			outputStream.write("Hello Server I am conneting you!".getBytes());
			outputStream.flush();
			byte[] bytes = new byte[1024];
			while (inputStream.read(bytes) > 0) {//這裡必須一次讀完才能進行下次讀操作,否則下一次會將上一次的
				String msg = new String(bytes).trim();
				System.out.println("client receive the message of server:" + msg);
				outputStream.write("client get the message !".getBytes());
				outputStream.flush();
				Thread.sleep(1*1000);
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
}
package com.xdong.bio.server;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BioServer {
	private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(5);
	
	public static void main(String[] args) {
		ServerSocket server = null;
		Socket socket = null;
		try {
			server = new ServerSocket(8888);
			while (!Thread.interrupted()) {
				socket = server.accept();
				THREAD_POOL.submit(new SocketHandler(socket));
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
	
	static class SocketHandler implements Runnable{
		private Socket socket;
		public SocketHandler(Socket socket) {
			super();
			this.socket = socket;
		}
		@Override
		public void run() {
			InputStream inputStream = null;
			OutputStream outputStream = null;
			try {
				inputStream = socket.getInputStream();
				outputStream = socket.getOutputStream();
				byte[] bs = new byte[1024];
				while (true) {
					int available = inputStream.available();// 輸入流中有多少可用位元組
					inputStream.read(bs, 0, available);//  從輸出流中將資料寫入位元組陣列(注:如果位元組陣列原本有100個位元組,本次讀取50個位元組,那麼它只會覆蓋前面50個位元組資料,位元組陣列中還是有100個位元組資料)
					String receive = new String(bs, 0,available).trim();
					System.out.println("server receive the message of client:" + receive);
					outputStream.write("server get the message!".getBytes());//write函式-阻塞
					outputStream.flush();
					Thread.sleep(1*1000);
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

}

     簡單分析一下上面程式碼:

  1. 繫結一個埠,產生一個SocketServer物件,並呼叫accept()方法,監聽這個埠,接收客戶端連線。
  2. 當有客戶端接入後,服務端單獨起一個執行緒,利用socket與客戶端互動。
  3. 利用socket.getInputStream流接收客戶端傳送的訊息,利用socket.getOutputStream流傳送訊息給客戶端。
  4. 注意:server.accept、inpustream.read、outputstream.write都會阻塞執行緒。
  5. 同步體現在服務端與客戶端的互動只能一步步順序進行。阻塞主要體現在第4點中的幾個方法。即使採用了多執行緒技術,實現了主執行緒非阻塞,但是子執行緒中與客戶端互動依舊是阻塞的。

Java NIO框架

    NIO,全稱是Non-Blocking IO或New IO,解釋就是非阻塞IO和新版IO。下面將給出三份不同的Server端程式碼,分別表示為:單Selector-單執行緒模式、單Selector-多執行緒模式、多Selector-多執行緒模式。

    NIO的特性如下:

  • 面向緩衝區
  • 同步
  • 非阻塞
package com.xdong.nio.v1.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * v1=單執行緒、單reactor
 * @author xiongchaoqun
 * @date 2018年7月2日
 */
public class NioServer {

	public static void main(String[] args) {
		ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
		try {
			serverSocketChannel = ServerSocketChannel.open();// 例項化
			serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
			Selector selector = Selector.open();// 例項化一個Selector
			serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
			while (selector.select() > 0) {
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
					if (key.isAcceptable()) {
						handleAcceptEvent(selector,key);
					} else if (key.isReadable()) {
						handleReadEvent(selector,key);
					}
					Thread.sleep(1000);
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 處理客戶端接入事件
	 * 
	 * @param key
	 * @throws IOException
	 */
	private static void handleAcceptEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = null;// 客戶端
		ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
		try {
			client = server.accept();// 客戶端接入產生一個SocketChannel
			System.out.println("server receive a connet request of client!");
			if (client == null) {
				return;
			}
			client.configureBlocking(false);// 設定客戶端通道為非阻塞
			client.register(selector, SelectionKey.OP_READ);//一般不會主動設定SelectionKey.OP_WRITE,因為緩衝區會一直處於可寫狀態,無限觸發select()
		} catch (Exception e) {
			server.close();
		}
	}

	/**
	 * 處理讀監聽事件
	 * @param key
	 * @throws IOException
	 */
	private static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		client.read(readBuffer);
		byte[] data = readBuffer.array();
		String msg = new String(data).trim();//消除空格
		System.out.println("server receive the client msg:" + msg);
		String outMsg = "I am the server!";
		ByteBuffer writeBuffer = ByteBuffer.wrap(outMsg.getBytes());
		client.write(writeBuffer);
	}

}
package com.xdong.nio.v2.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * v2=執行緒池、單reactor
 * @author xiongchaoqun
 * @date 2018年7月2日
 */
public class NioServer {

	public static void main(String[] args) {
		ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
		try {
			serverSocketChannel = ServerSocketChannel.open();// 例項化
			serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
			Selector selector = Selector.open();// 例項化一個Selector
			serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
			while (selector.select() > 0) {
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
					if (key.isAcceptable()) {
						handleAcceptEvent(selector,key);
					} else if (key.isReadable()) {
						new Processor().process(key);//使用執行緒池處理
					}
					Thread.sleep(1000);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 處理客戶端接入事件
	 * 
	 * @param key
	 * @throws IOException
	 */
	private static void handleAcceptEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = null;// 客戶端
		ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
		try {
			client = server.accept();// 客戶端接入產生一個SocketChannel
			System.out.println("server receive a connet request of client!");
			if (client == null) {
				return;
			}
			client.configureBlocking(false);// 設定客戶端通道為非阻塞
			client.register(selector, SelectionKey.OP_READ);//一般不會主動設定SelectionKey.OP_WRITE,因為緩衝區會一直處於可寫狀態,無限觸發select()
		} catch (Exception e) {
			server.close();
		}
	}

	/**
	 * 處理讀監聽事件
	 * @param key
	 * @throws IOException
	 */
	private static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		client.read(readBuffer);
		byte[] data = readBuffer.array();
		String msg = new String(data).trim();//消除空格
		System.out.println("server receive the client msg:" + msg);
		String outMsg = "I am the server!";
		ByteBuffer writeBuffer = ByteBuffer.wrap(outMsg.getBytes());
		client.write(writeBuffer);
	}

}

package com.xdong.nio.v3.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * v3=執行緒池+雙reactor(雙Selector)
 * @author xiongchaoqun
 * @date 2018年7月2日
 */
public class NioServer {

	public static void main(String[] args) {
		ServerSocketChannel serverSocketChannel = null;// 宣告服務端Channel
		try {
			serverSocketChannel = ServerSocketChannel.open();// 例項化
			serverSocketChannel.bind(new InetSocketAddress(1234));// 繫結埠
			Selector selector = Selector.open();// 例項化一個Selector
			serverSocketChannel.configureBlocking(false);//設定非阻塞,才能進行下面的register
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);// 註冊可接受連線監聽
			
		    int coreNum = 2;
		    Processor[] processors = new Processor[coreNum];
		    for (int i = 0; i < processors.length; i++) {
		    	processors[i] = new Processor();
		    }
		    int acceptNum = 0;
			while (selector.select() > 0) {
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();// 如果不移除,那麼下次遍歷還會處理這個selectionKey
					if (key.isAcceptable()) {
						acceptNum++;
						SocketChannel channel = null;// 客戶端
						ServerSocketChannel server = (ServerSocketChannel) key.channel();// 服務端
						channel = server.accept();// 客戶端接入產生一個SocketChannel
						System.out.println("server receive a connet request of client!");
						Processor processor = processors[acceptNum%coreNum];
						System.out.println(1);
						processor.addChannel(channel);//將SocketChannel交給另一個Selector去處理
						System.out.println(142);
						processor.wakeup();
					} 
					Thread.sleep(1000);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

package com.xdong.nio.v2.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * v2=執行緒池、單reactor
 * @author xiongchaoqun
 * @date 2018年7月2日
 */
public class NioClient {
	
	public static void main(String[] args) throws Exception{
		SocketChannel socketChannel = null;
		socketChannel = SocketChannel.open();
		Selector selector = Selector.open();
		socketChannel.configureBlocking(false);//設定非阻塞,這樣才能使用Selector
		socketChannel.connect(new InetSocketAddress(1234));//客戶端連線--這裡其實客戶端並沒有完成連線,是在channel.finishConnect();才能完成連線
		socketChannel.register(selector, SelectionKey.OP_CONNECT);//註冊可連線事件
		while (selector.select() > 0) {
			Iterator<SelectionKey>it = selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey key = (SelectionKey) it.next();
				it.remove();
				if (key.isConnectable()) {
					handleConnectEvent(selector, key);
				}else if(key.isReadable()) {
					handleReadEvent(selector, key);
				}
				Thread.sleep(1000);
			}
		}
	}
	
	
	public static void handleConnectEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = (SocketChannel) key.channel();
		if (client.isConnectionPending()) {
			client.finishConnect();
		}
		client.configureBlocking(false);//設定通道為非阻塞
		client.write(ByteBuffer.wrap(new String("I want to connet to the server!").getBytes()));//服務端  讀操作會讀到
		client.register(selector, SelectionKey.OP_READ);//註冊可讀時間
	}
	
	public static void handleReadEvent(Selector selector,SelectionKey key) throws IOException {
		SocketChannel client = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.allocate(1024);//宣告一個bytebuffer
		client.read(buffer);
		byte[] bytes = buffer.array();
		String getMsg = new String(bytes).trim();
		System.out.println("client receive server msg is "+getMsg);
		client.write(ByteBuffer.wrap(new String("I am a client!").getBytes()));
	}
}
    Demo專案程式碼已上傳至我的Git上,有興趣的可以去fork一下。