1. 程式人生 > >java NIOSocket 通訊簡要舉例

java NIOSocket 通訊簡要舉例

package socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

/**
 * 
 * NIOClient 
* * @author 王俊偉
[email protected]
* @date 2016年7月21日 下午5:26:25 */ public class NIOClient { private static final int SIZE = 1024; private static NIOClient instance = new NIOClient(); public String IP = "127.0.0.1";// 10.50.200.120 public int CLIENT_PORT = 4444;// 4444 9666 private SocketChannel channel; private Selector selector = null; String encoding = System.getProperty("file.encoding"); Charset charset = Charset.forName(encoding); private NIOClient() { } public static NIOClient getInstance() { return instance; } public void send(String content) throws IOException { selector = Selector.open(); channel = SocketChannel.open(); // channel = SocketChannel.open(new InetSocketAddress(IP,CLIENT_PORT)); InetSocketAddress remote = new InetSocketAddress(IP, CLIENT_PORT); channel.connect(remote); // 設定該sc以非阻塞的方式工作 channel.configureBlocking(false); // 將SocketChannel物件註冊到指定的Selector // SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT channel.register(selector, SelectionKey.OP_READ);//這裡註冊的是read讀,即從服務端讀資料過來 // 啟動讀取伺服器資料端的執行緒 new ClientThread().start(); channel.write(charset.encode(content)); // 建立鍵盤輸入流 Scanner scan = new Scanner(System.in);//這裡向服務端傳送資料,同時啟動了一個鍵盤監聽器 while (scan.hasNextLine()) { System.out.println("輸入資料:\n"); // 讀取鍵盤的輸入 String line = scan.nextLine(); // 將鍵盤的內容輸出到SocketChanenel中 channel.write(charset.encode(line)); } scan.close(); } /** * 從服務端讀入資料的執行緒
* * @author 王俊偉
[email protected]
* @date 2016年10月20日 下午9:59:11 */ private class ClientThread extends Thread { @Override public void run() { try { while (selector.select() > 0) { // 遍歷每個有可能的IO操作的Channel對銀行的SelectionKey for (SelectionKey sk : selector.selectedKeys()) { // 刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); // 如果該SelectionKey對應的Channel中有可讀的資料 if (sk.isReadable()) { // 使用NIO讀取Channel中的資料 SocketChannel sc = (SocketChannel) sk.channel(); String content = ""; ByteBuffer bff = ByteBuffer.allocate(SIZE); while (sc.read(bff) > 0) { sc.read(bff); bff.flip(); content += charset.decode(bff); } // 列印讀取的內容 System.out.println("服務端返回資料:" + content); // 處理下一次讀 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException io) { io.printStackTrace(); } } } /** * TCP 處理 執行緒
*/ class TCPClientReadThread implements Runnable { private Selector selector; public TCPClientReadThread(Selector selector) { this.selector = selector; new Thread(this).start(); } @Override public void run() { try { channel.configureBlocking(false); // selector.select(3000); channel.register(selector, SelectionKey.OP_READ); while (true) { if (selector.select(1000) > 0) { // 遍歷每個有可用IO操作Channel對應的SelectionKey for (SelectionKey sk : selector.selectedKeys()) { // 如果該SelectionKey對應的Channel中有可讀的資料 if (sk.isReadable()) { // 使用NIO讀取Channel中的資料 SocketChannel sc = (SocketChannel) sk.channel(); // 將位元組轉化為為UTF-8的字串 receiveData(sc); // 為下一次讀取作準備 sk.interestOps(SelectionKey.OP_READ); } else if (sk.isWritable()) { // 取消對OP_WRITE事件的註冊 ByteBuffer buffer = ByteBuffer.allocate(1024); sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); SocketChannel sc = (SocketChannel) sk.channel(); // 此步為阻塞操作,直到寫入作業系統傳送緩衝區或者網路IO出現異常 // 返回的為成功寫入的位元組數,若緩衝區已滿,返回0 int writeenedSize = sc.write(buffer); // 若未寫入,繼續註冊感興趣的OP_WRITE事件 if (writeenedSize == 0) { sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } else if (sk.isConnectable()) { SocketChannel sc = (SocketChannel) sk.channel(); sc.configureBlocking(false); // 註冊感興趣的IO事件,通常不直接註冊寫事件,在傳送緩衝區未滿的情況下 // 一直是可寫的,所以如果註冊了寫事件,而又不寫資料,則很容易造成CPU消耗100% // SelectionKey sKey = sc.register(selector, // SelectionKey.OP_READ); // 完成連線的建立 sc.finishConnect(); } // 刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); } } if (selector.select(1000) <= 0) { Thread.sleep(1000); continue; } } } catch (Exception ex) { ex.printStackTrace(); } } } /** * 客戶端傳送資料 * * @param channel * @param bytes * @throws Exception */ protected void sendData(SocketChannel channel, byte[] bytes) throws Exception { ByteBuffer buffer = ByteBuffer.wrap(bytes); channel.write(buffer); //channel.socket().shutdownOutput(); } protected void sendData(SocketChannel channel, String data) throws Exception { this.sendData(channel, data.getBytes()); } /** * 接受服務端的資料 * * @param channel * @return * @throws Exception */ protected void receiveData(SocketChannel channel) throws Exception { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); int count = 0; while ((count = channel.read(buffer)) != -1) { if (count == 0) { Thread.sleep(100); // 等等一下 continue; } // 轉到最開始 buffer.flip(); while (buffer.remaining() > 0) { System.out.print((char) buffer.get()); } buffer.clear(); } } public static void main(String[] args) { try { NIOClient nio = new NIOClient(); nio.send("test\n");//向服務端傳送資料 //nio.send("metrics:memory: swap: cpu: network i/o: disks i/o: tcp:\n"); } catch (IOException e) { e.printStackTrace(); } } }