使用javaNIO實現C/S模式的通訊
阿新 • • 發佈:2018-12-04
NIO使用非阻塞IO的方式實現,伺服器與客戶端的交流,適用於大量連線,而資料量少的情況。通過一個執行緒輪詢所有的通道,處理註冊的事件,而主執行緒可以繼續幹其他的事情。這樣所有的I/O都交給一個執行緒處理,減少了執行緒IO的切換。如果具體學習NIO的架構和原理請點選下面的連線
點選開啟連結 http://ifeve.com/selectors/
以下為一個使用NIO實現的C/S通訊模式,對應簡單例子學習,可以更容易入手。
服務端程式碼如下:
主執行緒類:MainChannel
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
public class MainChannel {
public static void main(String[] args) {
// TODO Auto-generated method stub
// 宣告伺服器監聽通道 和select選擇器
ServerSocketChannel serverChannel = null;
Selector selector = null;
try {
// 例項化selector 此種例項化模式說明 selector 是單例的
selector = Selector.open();
// 例項化伺服器監聽埠
serverChannel = ServerSocketChannel.open();
// 繫結監聽地址。
serverChannel.socket().bind(new InetSocketAddress(8881));
// 設定channel為非阻塞模式,一定要非阻塞模式才能註冊到selector中
serverChannel.configureBlocking(false);
// 把監聽通道註冊到選擇器中, 監聽此通道的連線事件。SelectionKey.OP_ACCEPT 指定為連線事件。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//所有通道,交給通道管理器,統一管理
ChannelManager.addChannel(serverChannel);
// 開啟一個執行緒負責管理selector,並輪詢是否有註冊監聽的事件就緒。
Thread thread = new Thread(new ServerNIO(selector));
thread.start();
// 然後主執行緒 就可以幹其他的事情了。不管客戶端連線 還是I/O
// 都不會阻塞此執行緒,只會阻塞selector管理執行緒,且只在等待事件發生時阻塞。
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("伺服器監聽發生異常");
}
}
}
管理selector的執行緒類:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ServerNIO implements Runnable {
// 客戶端通道,用於給某個客戶端收發資料
private SocketChannel socketChannel;
// 緩衝區,使用者收發資料,面向通道。
private ByteBuffer buf = ByteBuffer.allocate(1024);
// 選擇器,從主執行緒注入
private Selector selector;
// 指定執行緒是否輪詢
private boolean flag = true;
// 構造器中注入selector
public ServerNIO(Selector selector) {
// TODO Auto-generated constructor stub
this.selector = selector;
}
// 開啟執行緒等待事件的發生,輪詢通道。
@Override
public void run() {
// TODO Auto-generated method stub
try {
while (flag) {
/**
* 獲取等待的事件就緒的通道,如果沒有通道有事件就緒有三種情況
* 1. 直接返回:selectNow();
* 2. 超時返回:select(int timeout);
* 3. 阻 塞: select();
*/
int nums = selector.select(); // 阻塞模式
if (nums > 0) { // 阻塞模式下 此次判定多餘
// 當事件發生了,獲取發生事件通道的key集合;
Set<SelectionKey> selectKeys = selector.selectedKeys();
// 迭代遍歷這個keys集合
Iterator<SelectionKey> iter = selectKeys.iterator();
while (iter.hasNext()) {
// 獲取單個通道的key
SelectionKey key = iter.next();
// 如果是讀取事件就緒。說明有客戶端向伺服器傳送資料了。
if (key.isReadable()) {
// 先獲取到客戶端channel
socketChannel = (SocketChannel) key.channel();
buf.clear();
// 利用buffer讀取資料。
int len = socketChannel.read(buf);
if (len > 0) {
byte[] str = new byte[len];
buf.rewind();
buf.get(str, 0, len);
buf.clear();
System.out.println("獲取客戶端資料:" + new String(str));
// 給客戶端回覆資料
String temp = "伺服器回覆: 已經收到您傳送的資料,祝您一路平安!";
buf.clear();
buf.put(temp.getBytes());
buf.flip();
socketChannel.write(buf);
System.out.println("已經向客戶端回覆");
//此處可以利用ChannelManager向其他所有通道廣播資料。只要在ChannelManager中寫一個廣播方法即可
}
} else if (key.isAcceptable()) { // 如果是接受客戶端連線就緒
// 從key中獲取對應的通道
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 接受這個連線。
SocketChannel socketChannel = serverChannel.accept();
// 如果連線不為空,則控制檯列印 連線的客戶端地址。
if (socketChannel != null) {
// 由於關閉selector的時候,並不會關閉通道,最好使用一個容器,將通道都儲存起來
//然後開啟心跳連線,如果通道出現異常則關閉此通道,當應用程式關閉的時候,關閉所有的通道。
ChannelManager.addChannel(socketChannel);
System.out.println(String.format("接收到客戶端的連線:IP[%s]-埠[%s]",
socketChannel.socket().getPort(), socketChannel.socket().getInetAddress()));
// 把這個通道設定為非阻塞模式,然後又註冊到selector中,等待事件發生
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
} //寫事件在緩衝區直接達到閾值時候出發,一般不註冊寫事件。
// 通道處理完畢後 將此通道刪除。如果下次又有此時間,會有一個新的key,所以記得刪除處理過的key。
iter.remove();
}
}
}
} catch (IOException e) {
System.out.println("伺服器斷開連線");
}
try {
// 注意此處只會關閉,selector使註冊到琪上面的selectionKeys無效,通道本身不會關閉。
selector.close();
ChannelManager.closeChannles();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
通道管理類:
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.LinkedList;
public class ChannelManager {
private static LinkedList<Channel> list = new LinkedList<>();
private static Thread thread; //用於開啟心跳測試通道連線,實現省略
public static void addChannel(Channel channel) {
list.add(channel);
}
//關閉所有的通道連線
public static void closeChannles() {
for (Channel channel : list) {
try {
channel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("關閉通道失敗");
}
list.remove();
}
}
}
一個簡單的客戶端類: 這個指令碼可以直接通過cmd 編譯執行。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class MainChannel {
public static void main(String[] args) {
// TODO Auto-generated method stub
SocketChannel socketChannel = null;
boolean flag = true;
Scanner in = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel = SocketChannel.open();
// 設定連線為非阻塞模式,可能未連線完成就返回
// socketChannel.configureBlocking(false);
socketChannel.socket().connect(new InetSocketAddress("192.168.1.100", 8881));
// 判斷是否連線成功,等待連線成功
while (!socketChannel.finishConnect()) {
}
while (flag) {
String temp = in.nextLine();
buffer.clear();
buffer.put(temp.getBytes());
// limit指標 移動到 position位置
buffer.flip();
// 當buffer中有足夠空間,則寫到buffer中
while (buffer.hasRemaining())
socketChannel.write(buffer);
if ("exit".equals(temp))
flag = false;
}
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("與服務斷開連線");
}
if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
上述程式碼,可以實現客戶端向伺服器傳送訊息,然後伺服器接受到訊息,在自己的控制檯列印輸出。