Java NIO簡單例項(入門)
最近學習了一下Java NIO的開發,剛開始接觸selector,覺得有點繞,弄的有點暈,所以在這裡寫幾個簡單的例子,記錄一下,也與大家分享一下,對剛開始學習NIO的同學,希望有一些幫忙。大神就請多指點了。
開發穩定NIO對工程師的要求很高,NIO本身也存在很多的BUG,本文的例子只簡單的幫助簡單NIO的一些概念,對於一些例如TCP粘包/拆包等問題,不予以考慮。
對於NIO的一些概念什麼的,就不在這裡闡述了,給大家推薦幾個文章,可以參考一下,裡面有詳細的解釋。
Java NIO基本概念:
(1) http://ifeve.com/java-nio-all/
(2)
Java NIO開發的注意事項:
(1)http://blog.csdn.net/martin_liang/article/details/41224503
例項一:不使用selector實現埠通訊
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SocketTimePrint {
private static final Logger logger = Logger.getLogger(SocketTimePrint.class);
public static void main(String[] args) throws Exception{
Thread server = new Thread(new ServerMsgNoselector());
Thread client = new Thread(new ClientMsg());
logger.info("Beginning to start the server for message" );
server.start();
Thread.sleep(100);
logger.info("Beginning to start the client for message");
client.start();
}
}
class ClientMsg implements Runnable {
private static final Logger logger = Logger.getLogger(ClientMsg.class);
@Override
public void run() {
try {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
socket.configureBlocking(false);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (true) {
String datestr = format.format(new Date());
ByteBuffer buff = ByteBuffer.wrap(datestr.getBytes());
socket.write(buff);
Thread.sleep(2000);
}
} catch (IOException | InterruptedException ie) {
ie.printStackTrace();
}
}
}
class ServerMsgNoselector implements Runnable {
private static final Logger logger = Logger.getLogger(ServerMsgNoselector.class);
@Override
public void run() {
try {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(9999));
serverChannel.configureBlocking(true);
SocketChannel client = serverChannel.accept();
client.configureBlocking(true);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while(true) {
int len = client.read(buffer);
if (len > 0 && buffer.hasArray()) {
String msg = new String(buffer.array(), 0, len);
logger.info("The recvicing message is " + msg);
} else {
logger.info("Waiting for message ...");
}
buffer.clear();
}
}catch(Exception ie) {
ie.printStackTrace();
}
}
}
例子很簡單,分別在兩個程序中啟動一個client與一個server,通過9999埠進行通訊。服務端的channel都設定為了阻塞模式。該例子中,只有一個客戶端與一個server,而當在實際應用時,client會有很多連線,我可能也不只監聽一個埠,這時就會出現各種問題(可能看一下《Netty權威指南》裡面對IO的基本概念及Java IO的演進有一個大致的介紹),而selector就是為了解決這些問題,以達到更高的效能。
例項二:基於selector實現的通訊
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
public class SocketSCMode {
private static final Logger logger = Logger.getLogger(SocketSCMode.class);
public static void main(String[] args) throws Exception{
SocketSCMode leo = new SocketSCMode();
leo.exe();
}
public void exe() throws InterruptedException{
Thread server = new Thread(new ServerMsg());
Thread client = new Thread(new ClientMsg());
logger.info("Beginning to start the server for message");
server.start();
Thread.sleep(100);
logger.info("Beginning to start the client for message");
client.start();
}
class ClientMsg implements Runnable {
@Override
public void run() {
try {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
socket.configureBlocking(false);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (true) {
String datestr = format.format(new Date());
ByteBuffer buff = ByteBuffer.wrap(datestr.getBytes());
//logger.info("Sending the message " + datestr);
socket.write(buff);
Thread.sleep(2000);
}
} catch (IOException | InterruptedException ie) {
ie.printStackTrace();
}
}
}
class ServerMsg implements Runnable {
@Override
public void run() {
logger.info("Enter the server thread");
try {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress(9999));
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
long flag = 1L;
while (true) {
int count = selector.select();
if (count > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
logger.info("Flag-->" + flag + " The interest is " + key.interestOps());
if (key.isAcceptable()) {
logger.info("Flag-->" + flag + " The server accept one requesting");
iter.remove();
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
iter.remove();
logger.info("Flag-->" + flag + " Reading the mesage");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
int len = client.read(buff);
if (len > 0 && buff.hasArray()) {
byte[] arr = buff.array();
String msg = new String(arr, 0, len);
logger.info("Flag-->" + flag + " The recevied message is " + msg);
}
}
}
}
flag++;
}
} catch (IOException ie) {
ie.printStackTrace();
}
}
}
}
邏輯上與第一個例子完全一樣,要注意的是iter.remove()這個操作。
在該例子中,把if (key.isAcceptable())下的remove操作去掉,會報一個空指標異常。過程如下:
1、把ServerSocketChannel註冊到selector上,啟動client
2、第一次select,發現ServerSocketChannel是acceptable的,即有已選擇的鍵(不然程序會阻塞在select方法上),因些,返回遍歷SelectionKey的iter(已選擇鍵的集合)
3、進入if (key.isAcceptable())下的程式碼塊,通過accept返回一個SocketChannel與client進行通訊,並把這個SocketChannel註冊到selector上。遍歷結束
4、此時server端接收了client的連線,client開發傳送資料。server再執行select方法,發現SocketChannel是Readable,再次返回已選擇鍵的集合
5、由於第一次遍歷,ServerSocketChannel沒有從已選擇鍵的集合去掉,並且,selector不會更新已選擇鍵的集合中鍵的read集合,因此,遍歷iter時,會再次進入if (key.isAcceptable()),而此時,沒有client進行連線請求,因些,accept方法返回的是一下null,也就出現了空指標異常。
如果我們去掉程式中的第二個remove會發生什麼?程序會一直阻塞在select方法中,因為select方法是增量的,已在已選擇鍵的集合中的channel,不會去再處理。也就是說,select方法,只有當已選擇鍵的集合有新元素新增時才會返回(這麼理解有點不準確。。。)
例項三:多人聊天室
這個例子是參照別人的程式碼寫的,當然,沒人家寫的好,修修補補的,總算能按大概的意思運行了,提供給大家做反面教材吧(-_-!!!)
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.*;
public class MultiplayerChat {
static {
Logger.getLogger(ServerMsgRoom.class).setLevel(Level.ERROR);
Logger.getLogger(ClientMsgRoom.class).setLevel(Level.INFO);
}
public static void main(String[] args) throws Exception{
Thread client1 = new Thread(new ClientMsgRoom(true), "leo");
Thread client2 = new Thread(new ClientMsgRoom(true), "Makukin");
Thread client3 = new Thread(new ClientMsgRoom(true), "Forrestleo");
Thread server = new Thread(new ServerMsgRoom(), "ServerRoom");
System.out.println("Start server ...");
server.start();
Thread.sleep(100);
System.out.println("Start client");
client1.start();
client2.start();
client3.start();
server.join();
}
public static boolean canReadable(SocketChannel channel) {
return (channel.validOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ;
}
public static boolean canWriteable(SocketChannel channel) {
return (channel.validOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
}
}
class ClientMsgRoom implements Runnable{
private static final Logger logger = Logger.getLogger(ClientMsgRoom.class);
private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
private String name;
private String HOST;
private int PORT;
private ByteBuffer buff;
private boolean pringFlag;
public ClientMsgRoom(boolean flag) {
HOST = "127.0.0.1";
PORT = 9999;
buff = ByteBuffer.allocate(1024);
pringFlag = flag;
}
@Override
public void run() {
name = Thread.currentThread().getName();
try {
SocketChannel client = SocketChannel.open();
client.connect(new InetSocketAddress(HOST, PORT));
client.configureBlocking(false);
String headMsg = "Register : " + name + "\n";
logger.info(headMsg);
buff.put(headMsg.getBytes());
buff.flip();
while(buff.hasRemaining()) {
client.write(buff);
}
logger.info("Register the client ... ");
while(true) {
boolean flag = false;
if(MultiplayerChat.canReadable(client)){
int len = 0;
buff.clear();
while((len = client.read(buff)) > 0) {
String msg = new String(buff.array(), 0, len);
if(pringFlag)
System.out.println(name + "-->\n" + msg);
flag = true;
}
}
if(flag && MultiplayerChat.canWriteable(client)) {
String reMsg = name + "'s time is " + format.format(new Date()) + "\n";
//logger.info("Sending message " + reMsg);
buff.clear();
buff.put(reMsg.getBytes());
buff.flip();
while(buff.hasRemaining()) {
client.write(buff);
}
}
Thread.sleep(5000);
}
}catch(IOException | InterruptedException ie) {
ie.printStackTrace();
}
}
}
class ServerMsgRoom implements Runnable {
private static final Logger logger = Logger.getLogger(ServerMsgRoom.class);
private int PORT;
private ByteBuffer buff;
private Vector<SelectionKey> chaters;
public ServerMsgRoom() {
PORT = 9999;
buff = ByteBuffer.allocate(1024);
chaters = new Vector<>();
}
@Override
public void run() {
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(PORT));
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
int count = selector.select();
if(count <= 0)
continue;
logger.info("Begin to ack the keys and iter's size is " + count);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
if(key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
SocketChannel socket = serverChannel.accept();
logger.info("Add one register");
socket.configureBlocking(false);
SelectionKey rkey = socket.register(selector, SelectionKey.OP_READ);
chaters.add(rkey);
}else if(key.isValid() && key.isReadable()) {
SocketChannel clientChannel = (SocketChannel)key.channel();
int len = 0;
StringBuilder builder = new StringBuilder();
buff.clear();
while((len = clientChannel.read(buff)) > 0) {
String msg = new String(buff.array(), 0 , len);
builder.append(msg);
}
logger.info("PrintMessage " + builder.toString());
Iterator<SelectionKey> iterWriter = chaters.iterator();
while(iterWriter.hasNext()) {
SelectionKey keyWriter = iterWriter.next();
List<String> list = (List<String>) keyWriter.attachment();
if (list == null) {
list = new ArrayList<>();
keyWriter.attach(list);
}
list.add(builder.toString());
}
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}else if(key.isValid() && key.isWritable()) {
List<String> list = (List<String>) key.attachment();
SocketChannel writer = (SocketChannel) key.channel();
if(list != null) {
logger.info("The attaching list is " + list.toString());
Iterator<String> iterReader = list.iterator();
while(iterReader.hasNext()) {
String message = iterReader.next();
buff.clear();
buff.put(message.getBytes());
buff.flip();
while(buff.hasRemaining()) {
writer.write(buff);
}
logger.info("Writing message is " + message);
iterReader.remove();
}
}else{
logger.warn("The list is null");
}
key.interestOps(SelectionKey.OP_READ);
}
iter.remove();
}
}
}catch(IOException ie) {
ie.printStackTrace();
}
}
}