1. 程式人生 > >使用javaNIO實現C/S模式的通訊

使用javaNIO實現C/S模式的通訊

NIO使用非阻塞IO的方式實現,伺服器與客戶端的交流,適用於大量連線,而資料量少的情況。通過一個執行緒輪詢所有的通道,處理註冊的事件,而主執行緒可以繼續幹其他的事情。這樣所有的I/O都交給一個執行緒處理,減少了執行緒IO的切換。如果具體學習NIO的架構和原理請點選下面的連線

點選開啟連結 http://ifeve.com/selectors/

以下為一個使用NIO實現的C/S通訊模式,對應簡單例子學習,可以更容易入手。

服務端程式碼如下:

主執行緒類:MainChannel

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

public class MainChannel {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		// 宣告伺服器監聽通道 和select選擇器
		ServerSocketChannel serverChannel = null;
		Selector selector = null;

		try {
			// 例項化selector 此種例項化模式說明 selector 是單例的
			selector = Selector.open();
			// 例項化伺服器監聽埠
			serverChannel = ServerSocketChannel.open();
			// 繫結監聽地址。
			serverChannel.socket().bind(new InetSocketAddress(8881));
			// 設定channel為非阻塞模式,一定要非阻塞模式才能註冊到selector中
			serverChannel.configureBlocking(false);

			// 把監聽通道註冊到選擇器中, 監聽此通道的連線事件。SelectionKey.OP_ACCEPT 指定為連線事件。
			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
			//所有通道,交給通道管理器,統一管理
			ChannelManager.addChannel(serverChannel);
			// 開啟一個執行緒負責管理selector,並輪詢是否有註冊監聽的事件就緒。
			Thread thread = new Thread(new ServerNIO(selector));
			thread.start();

			// 然後主執行緒 就可以幹其他的事情了。不管客戶端連線 還是I/O
			// 都不會阻塞此執行緒,只會阻塞selector管理執行緒,且只在等待事件發生時阻塞。
		} catch (IOException e) {
			// TODO Auto-generated catch block
			System.out.println("伺服器監聽發生異常");
		}
	}
}
管理selector的執行緒類:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerNIO implements Runnable {
	// 客戶端通道,用於給某個客戶端收發資料
	private SocketChannel socketChannel;

	// 緩衝區,使用者收發資料,面向通道。
	private ByteBuffer buf = ByteBuffer.allocate(1024);

	// 選擇器,從主執行緒注入
	private Selector selector;

	// 指定執行緒是否輪詢
	private boolean flag = true;

	// 構造器中注入selector
	public ServerNIO(Selector selector) {
		// TODO Auto-generated constructor stub
		this.selector = selector;
	}

	// 開啟執行緒等待事件的發生,輪詢通道。
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			while (flag) {

				/**
				 * 獲取等待的事件就緒的通道,如果沒有通道有事件就緒有三種情況 
				 * 1. 直接返回:selectNow(); 
				 * 2. 超時返回:select(int timeout); 
				 * 3. 阻 塞: select();
				 */
				int nums = selector.select(); // 阻塞模式
				if (nums > 0) { // 阻塞模式下 此次判定多餘
					// 當事件發生了,獲取發生事件通道的key集合;
					Set<SelectionKey> selectKeys = selector.selectedKeys();

					// 迭代遍歷這個keys集合
					Iterator<SelectionKey> iter = selectKeys.iterator();

					while (iter.hasNext()) {
						// 獲取單個通道的key
						SelectionKey key = iter.next();
						// 如果是讀取事件就緒。說明有客戶端向伺服器傳送資料了。
						if (key.isReadable()) {
							// 先獲取到客戶端channel
							socketChannel = (SocketChannel) key.channel();
							buf.clear();
							// 利用buffer讀取資料。
							int len = socketChannel.read(buf);
							if (len > 0) {
								byte[] str = new byte[len];
								buf.rewind();
								buf.get(str, 0, len);
								buf.clear();
								System.out.println("獲取客戶端資料:" + new String(str));

								// 給客戶端回覆資料
								String temp = "伺服器回覆: 已經收到您傳送的資料,祝您一路平安!";
								buf.clear();
								buf.put(temp.getBytes());
								buf.flip();
								socketChannel.write(buf);
								System.out.println("已經向客戶端回覆");
								//此處可以利用ChannelManager向其他所有通道廣播資料。只要在ChannelManager中寫一個廣播方法即可
							}
						} else if (key.isAcceptable()) { // 如果是接受客戶端連線就緒
							// 從key中獲取對應的通道
							ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
							// 接受這個連線。
							SocketChannel socketChannel = serverChannel.accept();
							// 如果連線不為空,則控制檯列印 連線的客戶端地址。
							if (socketChannel != null) {
								// 由於關閉selector的時候,並不會關閉通道,最好使用一個容器,將通道都儲存起來
								//然後開啟心跳連線,如果通道出現異常則關閉此通道,當應用程式關閉的時候,關閉所有的通道。
								ChannelManager.addChannel(socketChannel);
								
								System.out.println(String.format("接收到客戶端的連線:IP[%s]-埠[%s]",
										socketChannel.socket().getPort(), socketChannel.socket().getInetAddress()));
								// 把這個通道設定為非阻塞模式,然後又註冊到selector中,等待事件發生
								socketChannel.configureBlocking(false);
								socketChannel.register(selector, SelectionKey.OP_READ);
							}
						} //寫事件在緩衝區直接達到閾值時候出發,一般不註冊寫事件。
						// 通道處理完畢後 將此通道刪除。如果下次又有此時間,會有一個新的key,所以記得刪除處理過的key。
						iter.remove();
					}

				}
			}
		} catch (IOException e) {
			System.out.println("伺服器斷開連線");
		}

		try {
			// 注意此處只會關閉,selector使註冊到琪上面的selectionKeys無效,通道本身不會關閉。
			selector.close();
			ChannelManager.closeChannles();
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
	}
}
通道管理類:

import java.io.IOException;
import java.nio.channels.Channel;
import java.util.LinkedList;

public class ChannelManager {
	private static LinkedList<Channel> list = new LinkedList<>();
	
	private static Thread thread; //用於開啟心跳測試通道連線,實現省略

	public static void addChannel(Channel channel) {
		list.add(channel);
	}

	//關閉所有的通道連線
	public static void closeChannles() {
		for (Channel channel : list) {
			try {
				channel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				System.out.println("關閉通道失敗");
			}
			list.remove();
		}
	}
}

一個簡單的客戶端類: 這個指令碼可以直接通過cmd 編譯執行。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class MainChannel {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		SocketChannel socketChannel = null;
		boolean flag = true;
		Scanner in = new Scanner(System.in);
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		try {
			socketChannel = SocketChannel.open();

			// 設定連線為非阻塞模式,可能未連線完成就返回
			// socketChannel.configureBlocking(false);
			socketChannel.socket().connect(new InetSocketAddress("192.168.1.100", 8881));

			// 判斷是否連線成功,等待連線成功
			while (!socketChannel.finishConnect()) {
			}

			while (flag) {

				String temp = in.nextLine();

				buffer.clear();

				buffer.put(temp.getBytes());

				// limit指標 移動到 position位置
				buffer.flip();

				// 當buffer中有足夠空間,則寫到buffer中
				while (buffer.hasRemaining())
					socketChannel.write(buffer);

				if ("exit".equals(temp))
					flag = false;
			}

		} catch (IOException e) {
			// TODO Auto-generated catch block
			System.out.println("與服務斷開連線");
		}

		if (socketChannel != null) {
			try {
				socketChannel.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

上述程式碼,可以實現客戶端向伺服器傳送訊息,然後伺服器接受到訊息,在自己的控制檯列印輸出。