java 非阻塞通訊的例子
阿新 • • 發佈:2018-12-30
package 建立非阻塞的EchoClient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class EchoClient1{ private ByteBuffer sendBuffer=ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); private int port=8000; private SocketChannel socketChannel; private Selector selector; private Charset charset=Charset.forName("GBK"); public EchoClient1(){ try { socketChannel=SocketChannel.open(); socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), port)); socketChannel.configureBlocking(false); System.out.println("客戶端已經與伺服器建立連線!"); selector=Selector.open(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void LocalRead() throws IOException{ BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in)); String msg=null; while((msg=localReader.readLine())!=null){ ByteBuffer buffer = charset.encode(msg+"\r\n"); synchronized(sendBuffer){ sendBuffer.put(buffer); } if(msg.equals("bye")) break; } } public void talk(){ SelectionKey key=null; try { socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while(selector.select()>0){ Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = readyKeys.iterator(); while(iter.hasNext()){ key = iter.next(); iter.remove(); if(key.isReadable()){ Receive(key); } if(key.isWritable()) Send(key); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void Send(SelectionKey key) throws IOException{ SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized(sendBuffer){ sendBuffer.flip(); socketChannel.write(sendBuffer); sendBuffer.compact(); } } void Receive(SelectionKey key) throws IOException{ SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.read(receiveBuffer); receiveBuffer.flip(); String data = charset.decode(receiveBuffer).toString(); if((data.indexOf("\r\n")==-1)) return ; String outputData = data.substring(0, data.indexOf("\n")+1); System.out.println(outputData); ByteBuffer buffer = charset.encode(outputData); receiveBuffer.position(buffer.limit()); receiveBuffer.compact(); if(outputData.equals("echo:bye")){ key.cancel(); key.channel().close(); System.out.println("與伺服器斷開連線"); selector.close(); System.exit(0); } } public static void main(String[] args) { EchoClient1 client = new EchoClient1(); new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { client.LocalRead(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); client.talk(); } } package 混合模式; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; import java.net.*; /*主執行緒負責接收和傳送資料(非阻塞) * 一個執行緒負責接受連線(阻塞) * */ public class EchoServer { private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8000; private Charset charset = Charset.forName("GBK"); private Object gate=new Object(); private SelectionKey key; public EchoServer() { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.socket().setReuseAddress(true); System.out.println("伺服器已經啟動!"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void accept() { try { while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println( "接受來自:" + socketChannel.socket().getInetAddress() + "埠:" + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized (gate) { selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void service() { while (true) { try { synchronized (gate) {} //就算主執行緒先進來,主執行緒將在這裡進行阻塞,當接受執行緒進入同步程式碼塊時,將喚醒selector //當接受執行緒在執行登記事件時,主線將在同步程式碼塊中阻塞,帶接受執行緒完成阻塞事件 int n = selector.select(); if (n == 0) continue; Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = readyKeys.iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); if(key!=null){ key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(32); socketChannel.read(readBuffer); readBuffer.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuffer); } public void send(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); String data = charset.decode(buffer).toString(); if (data.indexOf("\r\n") == -1) return; String outputData = data.substring(0, data.indexOf("\n") + 1); System.out.println(outputData); ByteBuffer reply = charset.encode("echo:" + outputData); while(reply.hasRemaining()) socketChannel.write(reply); ByteBuffer del = charset.encode(outputData); buffer.position(del.limit()); buffer.compact(); if(outputData.equals("bye\r\n")) { if(key!=null) { key.cancel(); key.channel().close(); System.out.println("關閉與客戶端的連線!"); } } } public static void main(String[] args) { // TODO Auto-generated method stub EchoServer server = new EchoServer(); new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub server.accept(); } }).start(); server.service(); } } package non_blocking; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set;; public class EchoServer { private ServerSocketChannel serverSocketChannel; private Selector selector; private int port=8000; private Charset charset=Charset.forName("GBK"); public EchoServer(){ try { selector= Selector.open(); serverSocketChannel= serverSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服務端已經啟動!"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void service(){ SelectionKey key=null; try { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(selector.select()>0){ Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while(iter.hasNext()){ key = iter.next(); iter.remove(); if(key.isAcceptable()){ ServerSocketChannel ssc=(ServerSocketChannel) key.channel(); SocketChannel socketChannel = ssc.accept(); System.out.println("接受來自"+socketChannel.socket().getInetAddress()+"埠:"+socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer=ByteBuffer.allocate(1024); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if(key.isReadable()) { receive(key);} if(key.isWritable()) { send(key);} } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); if(key!=null){ key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } public void receive(SelectionKey key) throws IOException{ ByteBuffer buffer = (ByteBuffer) key.attachment(); ByteBuffer buff = ByteBuffer.allocate(32); SocketChannel socketChannel= (SocketChannel) key.channel(); socketChannel.read(buff); buff.flip(); buffer.limit(buffer.capacity()); buffer.put(buff); } public void send(SelectionKey key) throws IOException{ ByteBuffer buffer= (ByteBuffer) key.attachment(); SocketChannel socketChannel= (SocketChannel) key.channel(); buffer.flip(); String data = charset.decode(buffer).toString(); if(data.indexOf("\r\n")==-1) return; String outputData=data.substring(0, data.indexOf("\n")+1); System.out.println(outputData); ByteBuffer echoBuffer=charset.encode("echo:"+outputData); while(echoBuffer.hasRemaining()) socketChannel.write(echoBuffer); ByteBuffer encode = charset.encode(outputData); buffer.position(encode.limit()); buffer.compact(); if(outputData.equals("bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("關閉伺服器之間的連線"); } } public static void main(String[] args) { // TODO Auto-generated method stub new EchoServer().service(); } }