1. 程式人生 > >利用java nio 實現簡單的訊息通訊

利用java nio 實現簡單的訊息通訊

         在分散式java應用中,經常需要在各個子系統間進行通訊與資料交換。在java領域要實現這樣的功能有很多途徑,下面我將使用nio+tcp/ip這種基於訊息機制的方式來實現這樣的功能。利用nio的非阻塞模式以及選擇器機制能夠很大程度上的提高程式的效能及吞吐量。利用執行緒池能夠方便的做到一請求一執行緒,在實際環境中這是非常有作用的,真實的應用場景往往是連線數可能會很多,但是同一時間向伺服器傳送的請求會遠遠小於實際的連線數。

/**
 *NIO tcp/ip客戶端
 */
public class Client {

	public static void main(String[] args) throws IOException {
		SocketChannel sc=SocketChannel.open();
		sc.configureBlocking(false);//需在sc.connect之前設定
		sc.connect(new InetSocketAddress("127.0.0.1",9999));
		Selector selector=Selector.open();
		sc.register(selector, SelectionKey.OP_CONNECT);
		while(true){
			if(sc.isConnected()){
				BufferedReader read=new BufferedReader(new InputStreamReader(System.in));
				int writeBytes=sc.write(Charset.forName("UTF-8").encode(read.readLine()));
				if(writeBytes==0){
					sc.register(selector, SelectionKey.OP_WRITE);//註冊寫事件(當寫緩衝區滿時)
				}
			}
			int selectKeys=selector.select();
			if(selectKeys==0){
				continue;
			}
			for(SelectionKey key:selector.selectedKeys()){
				if(key.isConnectable()){
					SocketChannel socketChanel=(SocketChannel)key.channel();
					if(socketChanel==null){
						continue;
					}
					socketChanel.configureBlocking(false);
					socketChanel.register(selector,SelectionKey.OP_READ);
					socketChanel.finishConnect();
				}else if(key.isReadable()){
					SocketChannel socketChanel=(SocketChannel)key.channel();
					ByteBuffer bf=ByteBuffer.allocate(1024);
					while(socketChanel.read(bf)>0){
						bf.flip();
						System.out.println(Charset.forName("UTF-8").decode(bf).toString());
						bf.clear();
					}
				}else if(key.isWritable()){
					//只要寫緩衝區未滿就一直會產生寫事件,如果此時又不寫資料時,會產生不必要的資源損耗,所以這裡需要取消寫事件以免cpu消耗100%
					//寫資料,如果寫緩衝區滿時繼續註冊寫事件key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
					key.interestOps(key.interestOps()&(~SelectionKey.OP_WRITE));
					
				}
			}
			selector.selectedKeys().clear();
		}
	}

/**
 *NIO tcp/ip 伺服器端
 */
public class Server {
	final static int PORT = 9999;
	//處理請求的執行緒池
	final static ExecutorService workThreadsPool = Executors
			.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 5 / 8 + 3);
	public static void main(String[] args) throws IOException,
			InterruptedException {
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.socket().bind(new InetSocketAddress("127.0.0.1", PORT));
		ssc.configureBlocking(false);
		Selector selector = Selector.open();
		ssc.register(selector, SelectionKey.OP_ACCEPT);
		while (true) {
			int selectorKeys = selector.select(1000L);
			if (selectorKeys == 0) {
				continue;
			}
			for (SelectionKey selectKey : selector.selectedKeys()) {
				if (selectKey.isAcceptable()) {
					ServerSocketChannel serverSocketChanel = (ServerSocketChannel) selectKey
							.channel();
					SocketChannel sc = serverSocketChanel.accept();
					// 因為是採用的非阻塞模式,所以當沒有連線時以上方法也為立即返回,只是返回的值為null
					if (sc == null) {
						continue;
					}
					System.out.println("accept a quest");
					sc.configureBlocking(false);
					sc.register(selector, SelectionKey.OP_READ);
				} else if (selectKey.isReadable()) {
					// 此處通過執行緒池實現了一請求一執行緒的機制,該機制的實現利用了NIO的非阻塞模式與通道選擇器機制
					// 相比以前BIO+THREADPOOL實現的一連線一執行緒的吞吐量更高,併發性更強
					HandlerRequestWork handlerRequest = new HandlerRequestWork(
							"Request handler");
					workThreadsPool.execute(handlerRequest);
					handlerRequest.handler(selectKey);
				}
			}
			selector.selectedKeys().clear();
		}
	}

	static class HandlerRequestWork extends Thread {
		private SelectionKey key;
		private final Lock lock = new ReentrantLock();
		private final Condition preparedSingle = lock.newCondition();

		public HandlerRequestWork(String threadName) {
			super(threadName);
		}

		public void handler(SelectionKey key) {
			lock.lock();
			try {
				preparedSingle.signalAll();
				this.key = key;
				// 在緩衝區資料未處理完成時,下一次輪詢selector.select(1000L)時依然會觸發readable事件,
				// 所以這裡避免對同一次請求進行多次處理需要取消readable這一感興趣事件 ,當處理完成後再註冊該事件
				key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
			} finally {
				lock.unlock();
			}
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see java.lang.Thread#run()
		 */
		@Override
		public void run() {

			lock.lock();
			try {
				if (key == null) {
					try {
						preparedSingle.await();
					} catch (InterruptedException e) {
						logger.admin(e.getMessage(), e);
						throw new RuntimeException(e);
					}
				}
				if (key == null) {
					return;
				}
				SocketChannel sc = (SocketChannel) key.channel();
				ByteBuffer bf = ByteBuffer.allocate(1024);
				try {
					while (sc.read(bf) > 0) {
						bf.flip();
						System.out.println(Charset.forName("UTF-8").decode(bf)
								.toString());
						bf.clear();
					}
					if (bf != null) {
						bf.clear();
						bf = null;
					}
					sc.write(Charset.forName("UTF-8").encode(
							"got messages from client=" + sc));
					key.interestOps(key.interestOps() | SelectionKey.OP_READ);
				} catch (IOException e) {
					logger.admin(e.getMessage(), e);
					try {
						sc.close();// 關閉發生發生異常的連線,並取消註冊SelectionKey
					} catch (IOException e1) {
						logger.admin(e1.getMessage(), e1);
					}
					key.cancel();
				}

			} finally {
				lock.unlock();

			}
		}
	}