走進分散式Java開發(一)—Java NIO
概述
何為NIO,百度百科上的解釋為:
java.nio全稱java non-blocking IO(實際上是 new io),是指jdk1.4 及以上版本里提供的新api(New IO) ,為所有的原始型別(boolean型別除外)提供快取支援的資料容器,使用它可以提供非阻塞式的高伸縮性網路。
為所有的原始型別提供(Buffer)快取支援。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支援鎖和記憶體對映檔案的檔案訪問介面。 提供多路(non-blocking) 非阻塞式的高伸縮性網路I/O 。
簡單來說,傳統Java是面向物件的,而NIO是面向緩衝區的,傳統的IO是阻塞的,NIO是非阻塞的。
核心部分
通道
通道(channel)和流(stream)最大的不同,就是流是單向的,通道是雙向的。
- FileChannel(IO)
- DatagramChannel(UDP)
- SocketChannel(TCP)
- ServerSocketChannel(TCP)
緩衝區
NIO中的關鍵Buffer實現有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分別對應基本資料型別: byte, char, double, float, int, long, short。當然NIO中還有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等
Selector
Selector執行單執行緒處理多個Channel,如果你的應用打開了多個通道,但每個連線的流量都很低,使用Selector()方法就會很方便,例如在一個聊天伺服器中。要使用Selector, 得向Selector註冊Channel,然後呼叫它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,執行緒就可以處理這些事件,事件的例子有如新的連線進來、資料接收等。
NIO TCP例項
首先
向buffer中寫資料用fileChannel.read()
向buffer中讀資料用fileChannel.write()
read返回值為-1的時候表示連線斷開
客戶端程式碼用NIO寫:
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
import java.util.logging.SocketHandler;
public class Client {
public static void main(String[] args){
client();
}
public static void client(){
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try{
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
if(socketChannel.finishConnect()){
int i = 0;
while(true){
TimeUnit.SECONDS.sleep(1);
String info = "I'm" + i++ + "-th information from client";
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();//flip方法將Buffer從寫模式切換到讀模式。呼叫flip()方法會將position設回0,並將limit設定成之前position的值。
while(buffer.hasRemaining()){
System.out.println(buffer);
socketChannel.write(buffer);
}
}
}
}catch (IOException | InterruptedException e){
e.printStackTrace();
}
finally {
try{
if(socketChannel != null){
socketChannel.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
}
服務端程式碼先用傳統IO來寫:
package NIO;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.stream.IntStream;
public class Server {
public static void main(String[] arg){
server();
}
public static void server(){
ServerSocket serverSocket = null;
InputStream in = null;
try{
serverSocket = new ServerSocket(9999);
int recvMsgSize = 0;
byte[] recvBuf = new byte[1024];
while (true){
Socket clntSocket = serverSocket.accept();
SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();//獲取另一端的IP和埠
System.out.println("Handing client at"+clientAddress);
in = clntSocket.getInputStream();
while((recvMsgSize = in.read(recvBuf)) != -1){
byte[] temp = new byte[recvMsgSize];
System.arraycopy(recvBuf,0,temp,0,recvMsgSize);
System.out.println(new String(temp));
}
}
}catch (IOException e){
e.printStackTrace();
}
finally {
try{
if(serverSocket != null){
serverSocket.close();
}
if(in != null){
in.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
}
用NIO寫的服務端:
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class ServerConnect {
private static final int BUF_SIZE = 1024;
private static final int PORT = 9999;
private static final int TIMEOUT = 3000;
public static void main(String[] args){
selector();
}
public static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));//
用selector必須要註冊
}
public static void handleRead(SelectionKey key)throws IOException{
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
long bytesRead = sc.read(buf);
while(bytesRead > 0){
buf.flip();
while(buf.hasRemaining()){
System.out.println((char)buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}
public static void handleWrite(SelectionKey key)throws IOException{
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel)key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();//將所有未讀的資料拷貝到Buffer起始處。然後將position設到最後一個未讀元素正後面
}
public static void selector(){
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector,SelectionKey.OP_ACCEPT);
while(true){
if(selector.select(TIMEOUT) == 0){
System.out.println("==");
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
iter.remove();
}
}
}catch (IOException e){
e.printStackTrace();
}finally {
try{
if(selector != null){
selector.close();
}
if(ssc != null){
ssc.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
}