1. 程式人生 > >Java BIO, NIO, AIO 簡單理解

Java BIO, NIO, AIO 簡單理解

1, 同步 VS. 非同步

比較的是當某種事件發生時呼叫處理函式的方式,

同步:應用程式主動呼叫處理函式,

非同步:作業系統呼叫應用程式的處理函式(也叫回調函式),對應用程式而言是被動過程。

2, 阻塞 VS. 非阻塞

阻塞:應用程式發起IO請求,IO請求(假設是網路上的IO)會經過網路、網絡卡、作業系統等,在這個過程中應用程式必須等到作業系統完成IO操作,才能繼續後面的事情,因此稱為阻塞,

非阻塞:應用程式在發起IO請求後不必等待作業系統完成IO操作,應用程式可以繼續做後面的事情,應用程式可以主動查詢(輪訓)某種IO事件是否完成,並呼叫相應的處理函式,或者在IO事件發生時作業系統呼叫應用程式的回撥函式。

3, BIO,NIO,AIO

JDK版本   同步and 阻塞  模型型別 隱喻
BIO 1.0 同步阻塞 一連線一執行緒

一個人開了一個餐廳,需要負責迎客(Accept connection)、

等待客戶點菜(Read data)、炒菜(處理請求)、

等待客戶用餐(Write data)、收銀(Close connection),此人在任何時刻

只能服務於一個客戶,新來的客戶必須等待。

改進型:一人迎客,之後再交由店裡空閒的員工獨自完成點菜、炒菜、

等待客戶用餐、收銀等操作,即多執行緒模式,這在一定程度上能提高效能。

NIO 1.4 同步非阻塞

一請求一執行緒,Reactor

一個人開了一個餐廳,負責迎客等所有操作,迎客後讓客戶點菜,在客戶點菜的過程中可以

做其它的操作,並每隔一段時間檢視客戶是否點菜完畢,如果點菜完畢則開始

炒菜。其它操作類似,如客戶用餐,不用等待客戶用餐,只需每隔一段時間檢視

客戶是否用餐完畢,再進行後續操作。

改進型:一個人負責檢查是否有某種事件發生,如是否有新客戶到來,是否有某個客戶點菜

完畢,是否有某個客戶用餐完畢等等,多個員工負責處理事件,如某個客戶點菜完畢

則讓一個空閒的員工來炒菜,如果某個客戶用餐完畢,則讓一個空閒的員工來收銀等等。

由於不用等待客戶(即沒有IO阻塞),在很大程度上提高了系統系能。

AIO 1.7 非同步非阻塞

一有效請求一執行緒,Proactor,

需要OS支援,Windows:IOCP,Linux:epoll

與NIO類似,只是不用主動查詢是否有某種事件發生,而是被告知該事件已發生。如客戶

點菜完畢後會主動說“我已經點菜完畢了”,然後讓一個空閒的員工來取選單並炒菜。

AIO是真正意義的非同步IO,需要OS支援,在讀IO資料時作業系統會將資料讀入到應用程式

指定的快取(ByteByffer)中,應用程式直接使用即可,不像NIO還要應用程式自己讀取。

AIO能簡化應用程式的編寫,因為其只關心有效的請求,將指定的請求繫結到對應的回撥函式

中即可,OS會自動呼叫回撥函來處理請求。

4, BIO程式碼片段

package com.my.study.io.bio;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
	private static final int PORT = 8888;
	private ServerSocket serverSocket = null;

	private ExecutorService service = Executors.newFixedThreadPool(5);

	private void startServer() {
		try {
			serverSocket = new ServerSocket(PORT);
			System.out.println("Server started on port: " + PORT);

			while (true) {
				Socket socket = serverSocket.accept();

				System.out.println("Received a new connection.");

				service.submit(new RequestResolver(socket));
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private static class RequestResolver implements Runnable {
		private Socket socket;

		public RequestResolver(Socket socket) {
			this.socket = socket;
		}

		public void run() {

			DataInputStream din = null;
			DataOutputStream dout = null;

			try {

				long threadId = Thread.currentThread().getId();

				InputStream in = socket.getInputStream();
				din = new DataInputStream(in);
				String str = din.readUTF();
				System.out.println("Thread: " + threadId + ", read message: "
						+ str);

				OutputStream out = socket.getOutputStream();
				dout = new DataOutputStream(out);
				dout.writeUTF("Response from server.");
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				if (din != null) {
					try {
						din.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}

				if (dout != null) {
					try {
						din.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}

				if (!socket.isClosed()) {
					try {
						socket.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	public static void main(String[] args) {
		new Server().startServer();
	}
}


package com.my.study.io.bio;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class Client {

	private static final int PORT = 8888;

	private static void testSendMessage() {

		Socket socket = null;

		DataInputStream din = null;
		DataOutputStream dout = null;

		try {
			socket = new Socket("127.0.0.1", PORT);

			OutputStream out = socket.getOutputStream();
			dout = new DataOutputStream(out);
			dout.writeUTF("This message is from a client.");

			InputStream in = socket.getInputStream();
			din = new DataInputStream(in);
			String str = din.readUTF();
			System.out.println("Response from server: " + str);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (din != null) {
				try {
					din.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

			if (dout != null) {
				try {
					din.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

			if (!socket.isClosed()) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		testSendMessage();
	}
}
5,NIO程式碼片段
<pre class="java" name="code">package com.my.study.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
	private static final int PORT = 8888;
	private Selector selector;

	private void startServer() {
		try {
			selector = Selector.open();
			ServerSocketChannel serverSocketChannel = ServerSocketChannel
					.open();
			serverSocketChannel.configureBlocking(false);
			SocketAddress address = new InetSocketAddress(PORT);
			serverSocketChannel.bind(address);
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

			System.out.println("Server started on port: " + PORT);
			while (true) {
				int selectCode = selector.select(1000);
				if (selectCode == 0) {
					continue;
				}

				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					if (key.isAcceptable()) {
						System.out.println("Received a request.");
						serverSocketChannel = (ServerSocketChannel) key
								.channel();
						SocketChannel socketChannel = serverSocketChannel
								.accept();
						socketChannel.configureBlocking(false);
						socketChannel.register(selector, SelectionKey.OP_READ);
					} else if (key.isReadable()) {
						readData(key);
					} else {
						System.out.println("Key is no recognised, key: " + key);
					}
					it.remove();
				}

			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void readData(SelectionKey key) {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
		try {
			socketChannel.read(buffer);
			byte[] bytes = buffer.array();
			String message = new String(bytes).toString();
			System.out.println("Received message from client: " + message);

			socketChannel.write(ByteBuffer.wrap("Response from server."
					.getBytes()));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new Server().startServer();
	}
}


package com.my.study.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Client {

	private Selector selector;

	public void initClient(String ip, int port) throws IOException {
		SocketChannel channel = SocketChannel.open();
		channel.configureBlocking(false);
		this.selector = Selector.open();
		channel.connect(new InetSocketAddress(ip, port));
		channel.register(selector, SelectionKey.OP_CONNECT);
	}

	public void listen() throws IOException {
		while (true) {
			int selectCode = selector.select(1000);
			if (selectCode == 0) {
				continue;
			}

			Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey key = (SelectionKey) it.next();
				if (key.isConnectable()) {
					SocketChannel channel = (SocketChannel) key.channel();
					if (channel.isConnectionPending()) {
						channel.finishConnect();

					}
					channel.configureBlocking(false);
					channel.register(this.selector, SelectionKey.OP_READ);

					channel.write(ByteBuffer.wrap(new String(
							"Message from a client.").getBytes()));
				} else if (key.isReadable()) {
					readData(key);
				}
				it.remove();
			}
		}
	}

	public void readData(SelectionKey key) throws IOException {
		SocketChannel channel = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
		channel.read(buffer);
		byte[] data = buffer.array();
		String msg = new String(data).trim();
		System.out.println("Response from server: " + msg);
	}

	public static void main(String[] args) throws IOException {
		Client client = new Client();
		client.initClient("localhost", 8888);
		client.listen();
	}

}

6,AIO程式碼片段
<pre class="java" name="code">package com.my.study.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.TimeUnit;

public class Server {
	private static final int PORT = 8888;

	private AsynchronousServerSocketChannel server;

	private void startServer() {
		try {
			server = AsynchronousServerSocketChannel.open().bind(
					new InetSocketAddress(PORT));

			System.out.println("Server started on port: " + PORT);

			server.accept(null,
					new CompletionHandler<AsynchronousSocketChannel, Object>() {
						final ByteBuffer buffer = ByteBuffer
								.allocate(1024 * 1024);

						public void completed(AsynchronousSocketChannel result,
								Object attachment) {
							buffer.clear();
							try {
								result.read(buffer).get(100, TimeUnit.SECONDS);
								buffer.flip();
								String str = new String(buffer.array());
								System.out.println("Received message: " + str);

								result.write(ByteBuffer
										.wrap("Response from server."
												.getBytes()));
							} catch (Exception e) {
								e.printStackTrace();
							} finally {
								try {
									result.close();
									server.accept(null, this);
								} catch (IOException e) {
									e.printStackTrace();
								}
							}
						}

						public void failed(Throwable exc, Object attachment) {
							System.out.println("Failed: " + exc);
						}
					});
			while (true) {
				System.out.println("Server main thread.");
				try {
					Thread.sleep(1000 * 60 * 60 * 24 * 365 * 100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new Server().startServer();
	}
}


package com.my.study.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class Client {
	private AsynchronousSocketChannel client;

	private void start() {
		try {
			client = AsynchronousSocketChannel.open();
		} catch (IOException e) {
			e.printStackTrace();
			return;
		}

		client.connect(new InetSocketAddress("127.0.0.1", 8888), null,
				new CompletionHandler<Void, Object>() {
					public void completed(Void result, Object attachment) {
						client.write(ByteBuffer
								.wrap("Request message from client.".getBytes()));
					}

					public void failed(Throwable exc, Object attachment) {
						System.out.println("Connect Failed: " + exc);
					}
				});

		final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
		client.read(buffer, null, new CompletionHandler<Integer, Object>() {
			public void completed(Integer result, Object attachment) {
				String str = new String(buffer.array());
				System.out.println("Response from server: " + str);
			}

			public void failed(Throwable exc, Object attachment) {
				System.out.println("Read Failed: " + exc);
			}
		});
		
		while (true) {
			System.out.println("Client main thread.");
			try {
				Thread.sleep(1000 * 60 * 60 * 24 * 365 * 100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		new Client().start();
	}
}