1. 程式人生 > 實用技巧 >IO、NIO實現簡單聊天室,附帶問題解析

IO、NIO實現簡單聊天室,附帶問題解析

  本篇文章主要使用IO和NIO的形式來實現一個簡單的聊天室,並且說明IO方法存在的問題,而NIO又是如何解決的。

  大概的框架為,先提供思路和大概框架圖——程式碼——問題及解決方式,這樣會容易看一點。

1. IO寫法

1.1 思路框架

  下面編寫一個簡單的聊天室,大概需要的功能就是服務端維護一個聊天室,裡邊的客戶端傳送訊息之後服務將其訊息轉發給其他客戶端,達到一個聊天室的效果。

  大致的思路:服務端區分職責,分成兩部分,主執行緒負責接收連線並把連線放入到執行緒池中處理,維護一個執行緒池,所有對於socket的處理都交給執行緒池中的執行緒來處理。如下圖。

  下面貼上demo程式碼(程式碼中有幾處為了方便並沒有採用最規範的定義方式,如執行緒池的建立和Map

初始化的時候未設定初始容量等)

  程式碼分五個類,服務端(ChatServer,監聽作用,為服務端主執行緒)、客戶端(ChatClient)、服務端處理器(ServerHandler,可以理解為執行緒池中要執行的事情)、客戶端處理器(ClientHandler,客戶端讀寫伺服器訊息的處理),工具類(SocketUtils,只有一個傳送訊息方法)。

1.2 demo程式碼

服務端:

/**
* 服務端啟動類
* 主要負責監聽客戶端連線
*/
public class ChatServer { public static void main(String[] args) {
ServerSocket serverSocket = null;
/*----------為了方便使用Executors建立執行緒-------------*/
ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100);
try {
serverSocket = new ServerSocket(8888);
while (true) {
System.out.println("-----------阻塞等待連線------------");
Socket socket = serverSocket.accept();
String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
System.err.println(key + "已連線");
// 主執行緒只接收,處理直接交給處理執行緒池
handlerThreadPool.execute(new ServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
if (Objects.nonNull(serverSocket)) {
try {
serverSocket.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
} }

服務端處理類:


/**
* 服務端socket事件處理類
* 負責處理對應socket中的讀寫操作
*/
public class ServerHandler implements Runnable { /**
* 連線到服務端的所有連線 socket的地址埠->socket
*/
private static final Map<String, Socket> socketMap = new ConcurrentHashMap<>(); /**
* 維護名稱和地址的map
*/
private static final Map<String, String> nameMap = new ConcurrentHashMap<>(); private Socket socket; /**
* 每個socket的標識,使用地址+埠構成
*/
private String key; public ServerHandler() {
} public ServerHandler(Socket socket) {
this.socket = socket;
this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
} @Override
public void run() {
Socket s = socket;
// 根據訊息執行不同操作
InputStream inputStream;
// debug檢視資料用
// Map<String, Socket> tmpMap = socketMap;
try {
inputStream = s.getInputStream();
Scanner scanner = new Scanner(inputStream);
while (true) {
String line = scanner.nextLine();
if (line.startsWith("register")) {
// 登記
String[] split = line.split(":");
String name = split[1];
String msg;
// 校驗是否存在
if (socketMap.containsKey(key)) {
msg = "請勿重複登記";
sendMsg(s, msg);
return;
} if (nameMap.containsValue(name)) {
msg = "名稱已被登記,請換一個名稱";
sendMsg(s, msg);
return;
} // 通知自己已連線
sendMsg(s, "已連線到伺服器"); msg = name + "進入聊天室";
// 將訊息轉發給其他客戶端
sendMsgToClients(msg); // 放入socket池
socketMap.put(key, s);
nameMap.put(key, name);
System.err.println(name + "已登記");
} else if (line.trim().equalsIgnoreCase("end")) {
if (notPassRegisterValidate()) {
continue;
} // 斷開連線
socketMap.remove(key);
String name = nameMap.get(key);
String msg = name + "離開聊天室";
System.err.println(msg);
// 將訊息轉發給其他客戶端
sendMsgToClients(msg); msg = "已斷開連線";
// 傳送給對應的連線斷開資訊
sendMsg(s, msg);
inputStream.close();
break;
} else {
if (notPassRegisterValidate()) {
continue;
} // 正常通訊
String name = nameMap.get(key);
String msg = name + ":" + line;
// 將訊息轉發給其他客戶端
sendMsgToClients(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}
} /**
* 是否已登入校驗
*
* @return 是否已登入
*/
private boolean notPassRegisterValidate() {
boolean hasRegister = nameMap.containsKey(key);
if (hasRegister) {
return false;
} String msg = "您還未登入,請先登入";
sendMsg(socket, msg);
return true;
} /**
* 往連線傳送訊息
*
* @param socket 客戶端連線
* @param msg 訊息
*/
private void sendMsg(Socket socket, String msg) {
SocketUtils.sendMsg(socket, msg);
if (socket.isClosed()) {
socketMap.remove(key);
}
} /**
* 傳送給其他客戶端資訊
*
* @param msg 資訊
*/
private void sendMsgToClients(String msg) {
for (Map.Entry<String, Socket> entry : socketMap.entrySet()) {
if (this.key.equals(entry.getKey())) {
continue;
} sendMsg(entry.getValue(), msg);
}
} }

工具類(一個傳送訊息的方法):

public class SocketUtils {

    private SocketUtils() {
} public static void sendMsg(Socket socket, String msg) {
Socket s = socket;
OutputStream outputStream = null;
msg += "\r\n";
try {
outputStream = s.getOutputStream();
outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
} catch (IOException e) {
System.err.println("傳送訊息失敗, 連線已斷開");
try {
if (Objects.nonNull(outputStream)) {
outputStream.close();
}
socket.close();
} catch (IOException ioException) {
ioException.printStackTrace();
} }
} }

客戶端:

/**
* 客戶端讀和寫各自使用一個執行緒
*/
public class ChatClient { public static void main(String[] args) {
Socket socket;
ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2);
try {
socket = new Socket("localhost", 8888); // 寫執行緒
clientHandlerPool.execute(new ClientHandler(socket, 1));
// 讀執行緒
clientHandlerPool.execute(new ClientHandler(socket, 0)); } catch (IOException e) {
e.printStackTrace();
}
} }

客戶端處理器:

/**
* 客戶端處理器
* 根據type來區分是做讀工作還是寫工作
*/
public class ClientHandler implements Runnable { private Socket socket; /**
* 處理型別,0-讀、1-寫
*/
private int type; public ClientHandler() {
throw new IllegalArgumentException("不能使用沒有引數的建構函式");
} public ClientHandler(Socket socket, int type) {
this.socket = socket;
this.type = type;
} @Override
public void run() {
if (type == 1) {
// 進行寫操作
doWriteJob();
return;
} // 預設讀操作
doReadJob();
} /**
* 讀操作
*/
private void doReadJob() {
Socket s = socket;
InputStream inputStream;
try {
inputStream = s.getInputStream();
Scanner scanner = new Scanner(inputStream);
while (true) {
String line = scanner.nextLine();
if (null != line && !"".equals(line)) {
System.err.println(line);
}
// 如果已退出了,那麼關閉連線
if ("已斷開連線".equals(line)) {
socket.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
try {
socket.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
} /**
* 寫執行緒
*/
private void doWriteJob() {
Socket s = socket;
try {
Scanner scanner = new Scanner(System.in);
while (true) {
String output = scanner.nextLine();
if (Objects.nonNull(output) && !"".equals(output)) {
SocketUtils.sendMsg(s, output);
}
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("錯誤發生了:" + e.getMessage());
}
}
}

結果:

思考:當前這樣實現有什麼瓶頸,可能會出現什麼問題?

存在問題:

  1. 服務端使用accept阻塞接收執行緒,連線一個一個處理,在高併發下處理效能緩慢
  2. 沒有連線的時候執行緒一直處於阻塞狀態造成資源的浪費(如果使用多執行緒接收處理併發,那麼沒連線的時候造成多個執行緒的資源浪費)。

2. 使用NIO實現聊天室

2.1 整體思路

  那我們來看下NIO是怎麼解決上方的問題的,首先上這個demo整體的架構圖。

  大概的邏輯為

  1. 服務端將ServerSocketChannel註冊到Selector中,客戶端連線進來的時候事件觸發,將客戶端的連線註冊到selector中。
  2. 主執行緒負責selector的輪詢工作,發現有事件可以處理就將其交給執行緒池
  3. 客戶端同理分成兩個部分,寫操作和讀操作,每個操作由一個執行緒單獨完成;但是如果讀操作處理使用while迴圈不斷輪詢等待接收的話,CPU會飆升,所以需要客戶端新建一個selector來解決這個問題,注意這個selector跟服務端不是同一個,沒有啥關係。

  程式碼分類大致跟IO寫法一樣,分成服務端、服務端處理器、客戶端、客戶端處理器,下面為demo。

2.2 程式碼

服務端:

public class ChatServer {

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100);

    public ChatServer() throws IOException {
this.selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(9999));
// 將服務端的socket註冊到selector中,接收客戶端,並將其註冊到selector中,其本身也是selector中的一個I/O事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.err.println("聊天室服務端初始化結束");
} /**
* 啟動方法
* 1.監聽,拿到之後進行處理
*/
public void start() throws IOException {
int count;
while (true) {
// 可能出現select方法沒阻塞,空輪詢導致死迴圈的情況
count = selector.select(); if (count > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 交給執行緒池處理
handlerPool.execute(new ServerHandler(key, selector));
// 處理完成後移除
iterator.remove();
}
}
}
} public static void main(String[] args) throws IOException {
new ChatServer().start();
}
}

服務端處理器:


public class ServerHandler implements Runnable { private SelectionKey key; private Selector selector; public ServerHandler() { } /**
* 本來可以通過key拿到selector,這裡為了圖方便就這樣寫了
*/
public ServerHandler(SelectionKey key, Selector selector) {
this.key = key;
this.selector = selector;
} @Override
public void run() {
try {
if (key.isAcceptable()) {
// 說明是服務端的事件,注意這裡強轉換為的是ServerSocketChannel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 接收連線
SocketChannel socket = channel.accept();
if (Objects.isNull(socket)) {
return;
} socket.configureBlocking(false);
// 接收客戶端的socket並且將其註冊到服務端這邊的selector中,注意客戶端在此時跟服務端selector產生關聯
socket.register(selector, SelectionKey.OP_READ);
System.err.println("服務端已接收連線");
} else if (key.isReadable()) {
// 客戶端傳送資訊過來了
doReadJob();
}
} catch (IOException e) {
e.printStackTrace();
// 錯誤處理
}
} /**
* 讀取操作
*/
private void doReadJob() throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readCount = socketChannel.read(buffer);
if (readCount > 0) {
String msg = new String(buffer.array(), StandardCharsets.UTF_8);
System.err.println(socketChannel.getRemoteAddress().toString() + "的資訊為:" + msg); // 轉發給其他客戶端
sendMsgToOtherClients(msg);
}
} /**
* 轉發訊息給其他客戶端
*
* @param msg 訊息
*/
private void sendMsgToOtherClients(String msg) throws IOException { SocketChannel self = (SocketChannel) key.channel(); Set<SelectionKey> keys = selector.keys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
SelectableChannel channel = selectionKey.channel();
// 如果是本身或者不是socketChannel型別則跳過
if (self.equals(channel) || channel instanceof ServerSocketChannel) {
continue;
} SocketChannel socketChannel = (SocketChannel) channel;
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
socketChannel.write(byteBuffer);
}
}
}

客戶端:


public class ChatClient { private Selector selector; private SocketChannel socketChannel; private static ExecutorService dealPool = Executors.newFixedThreadPool(2); public ChatClient() throws IOException { /*
* 說明一下:
* 客戶端這邊的selector跟剛才在服務端定義的selector是不同的兩個selector
* 客戶端這邊不需要selector也能實現功能,但是讀取的時候必須不斷的迴圈,會導致CPU飆升,
* 所以使用selector是為瞭解決這個問題的,別跟服務端的selector搞混就好
*/
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} public void start() throws IOException, InterruptedException {
// 連線
// socketChannel.connect(new InetSocketAddress("localhost", 9999));
while (!socketChannel.finishConnect()) {
System.err.println("正在連線...");
TimeUnit.MILLISECONDS.sleep(200);
} System.err.println("連線成功"); // 使用兩個執行緒來分別處理讀取和寫操作
// 寫資料
dealPool.execute(new ClientHandler(selector, socketChannel, 1)); // 讀取資料
dealPool.execute(new ClientHandler(selector, socketChannel, 0));
} public static void main(String[] args) throws IOException, InterruptedException {
new ChatClient().start();
}
}

客戶端處理器:


public class ClientHandler implements Runnable { private Selector selector; private SocketChannel socketChannel; /**
* 0-讀,1-寫
*/
private int type; public ClientHandler() {
} public ClientHandler(Selector selector, SocketChannel socketChannel, int type) {
// selector是為瞭解決讀時候CPU飆升的問題,具體見客戶端的啟動類程式碼註釋
this.selector = selector;
this.socketChannel = socketChannel;
this.type = type;
} @Override
public void run() {
try {
if (type == 0) {
doClientReadJob();
return;
} doClientWriteJob();
} catch (IOException e) {
e.printStackTrace();
}
} /**
* 寫操作
*/
private void doClientWriteJob() throws IOException {
SocketChannel sc = socketChannel;
Scanner scanner = new Scanner(System.in);
while (true) {
if (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (null != line && !"".equals(line)) {
ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8));
sc.write(buffer);
}
}
}
} /**
* 讀操作
*/
private void doClientReadJob() throws IOException {
SocketChannel sc = socketChannel;
ByteBuffer buf = ByteBuffer.allocate(1024);
while (true) {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 這是必須的,不然下方的remove會出錯
SelectionKey next = iterator.next();
// 這裡因為只有本身這個客戶端註冊到客戶端的selector中,所以有事件一定是它的,也就不用從key拿了,直接操作就行
buf.clear();
int read = sc.read(buf);
if (read > 0) {
String msg = new String(buf.array(), StandardCharsets.UTF_8);
System.err.println(msg);
}
// 事件處理完之後要移除這個key,否則的話selector.select()方法不會再讀到這個key,即便有新的時間到這個channel來
iterator.remove();
}
}
}
} }

  結果圖:

在編寫的過程中發現了以下兩點:

  1. select方法之後如果存在key,並且接下來的操作未對這個selectionKeyremove操作,那麼下次的select不會再將其選入,即便有事件發生,也就是說,select方法不會選擇之前已經選過的key。
  2. selector.select()方法中偶爾會出現不阻塞的情況。這就是NIO中的空輪詢bug,也就是說,沒有連線又不阻塞的話,while(true) ... 的寫法就是一個死迴圈,會導致CPU飆升。

  第二點問題在NIO框架(如netty)中都採用了比較好的解決方法,可以去查下如何解決的。接下來看下NIO的寫法是否解決了IO寫法中存在的問題:

  1. 服務端使用accept阻塞接收執行緒,連線一個一個處理,在高併發下處理效能緩慢。

    答:上述寫法中還是使用一個ServerSocketChannel來接收客戶端,沒有解決這個問題;但是可以通過使用執行緒池的方式來解決。也就是說將服務端的事件分成兩個部分,第一個部分為接收客戶端,使用一個執行緒池來維護;第二個部分為客戶端的事件處理操作,也維護一個執行緒池來執行這些事件。

      這樣效能上去了,由於selector的存在也不會出現資源浪費的事情netty就是這麼做的哦。

  2. 沒有連線的時候執行緒一直處於阻塞狀態造成資源的浪費(如果使用多執行緒接收處理併發,那麼沒連線的時候造成多個執行緒的資源浪費)。

    答:解決。NIO寫法主要有selector不斷輪詢,不會出現沒連線不作為的情況,而且多個連線的話也沒有問題(參考1的回答)。

3. 小結

  兩種寫法都有Reactor模式的影子,但是IO寫法有明顯的缺點就是如果沒有連線會造成資源浪費的問題(採用多個接收連線的話更甚),而NIO中selector輪詢機制就很好的解決了無連線時無作為的情況,並且在效能方面可以通過職責分類和執行緒池來得到改善,所以,我選擇NIO。

需要壓力,需要努力。