AIO實現非同步通訊
阿新 • • 發佈:2018-12-24
Asynchronous ---> Channel
- 重點
- AsynchronousSocketChannel 和 AsynchronousServerSocketChannel
- 支援TCP的非同步Channel
-
AsynchronousSocketChannel
- 負責監聽的Channel
- 建立方法:
- 呼叫AsynchronousServerSocketChannel.open()靜態方法返回該類的例項
- 呼叫bind()例項方法指定監聽的地址和埠號
- open()
- open() 建立預設的AsynchronousServerSocketChannel
- open(AsynchronousChannelGroup g) : 使用指定的AsynchronousChannelGroup來建立AsynchronousServerSocketChannel。
- AsynchronousChannelGroup
- 是一個非同步的Channel的分組管理器,可以實現資源共享。
- 建立AsynchronousChannelGroup 時需要傳入一個ExecutorService.實質時繫結一個執行緒池。該執行緒池負責處理IO事件和觸發CompletionHander()處理器
- accept()接受客戶端的連結
- Future accept(): 接受客戶端請求,若需要返回AsynchronousSocketChannel,則應該呼叫Future的get()方法。get()方法會阻塞執行緒,等待返回AsynchronousSocketChannel。因此這種方式依然會阻塞執行緒(必須等待get()方法返回)
- void accpt(A attachment ,CompletionHander hander ) : 接受請求,不管成功還是失敗都會觸發CompletionHander的相應方法 (真的實現了非同步)
- CompletionHander
- completed(V result, A attachment): io操作完成後觸發,第一個引數代表IO操作返回的物件
- failed (Throwable exc, A attachment) io失敗觸發,第一個引數代表IO操作返回的異常或錯誤
- 另外不只accept()可以接受CompletionHander監聽器,AsynchronousSocketChannel的connect() read(),write()方法都有兩個版本,其中一個可以接受CompletionHander,和accept() 相同,預設情況都是阻塞的,需要呼叫get(),並返回
-
AsynchronousSocketChannel
- 建立 : open()和AsynchronousServerSocketChannel的open相同
- 呼叫connect()方法連結到指定ip和埠
- 呼叫 read() write() 方法讀寫
使用 無參 open()建立預設AsynchronousServerSocketChannel 和阻塞式的 accept()
public class SimpleAIOServer
{
static final int PORT = 30000;
public static void main(String[] args)
throws Exception
{
try(
// ①建立AsynchronousServerSocketChannel物件。
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open())
{
// ②指定在指定地址、埠監聽。
serverChannel.bind(new InetSocketAddress(PORT));
while (true)
{
// ③採用迴圈接受來自客戶端的連線
Future<AsynchronousSocketChannel> future
= serverChannel.accept();
// 獲取連線完成後返回的AsynchronousSocketChannel
AsynchronousSocketChannel socketChannel = future.get();
// 執行輸出。
socketChannel.write(ByteBuffer.wrap("歡迎你來自AIO的世界!"
.getBytes("UTF-8"))).get();
}
}
}
}
使用 無參 open()建立預設AsynchronousSocketChannel 和阻塞式的 connect()
public class SimpleAIOClient
{
static final int PORT = 30000;
public static void main(String[] args)
throws Exception
{
// 用於讀取資料的ByteBuffer。
ByteBuffer buff = ByteBuffer.allocate(1024);
Charset utf = Charset.forName("utf-8");
try(
// ①建立AsynchronousSocketChannel物件
AsynchronousSocketChannel clientChannel
= AsynchronousSocketChannel.open())
{
// ②連線遠端伺服器
clientChannel.connect(new InetSocketAddress("127.0.0.1"
, PORT)).get(); // ④
buff.clear();
// ③從clientChannel中讀取資料
clientChannel.read(buff).get(); // ⑤
buff.flip();
// 將buff中內容轉換為字串
String content = utf.decode(buff).toString();
System.out.println("伺服器資訊:" + content);
}
}
}
伺服器端使用帶AsynchronousChannelGroup分組管理的open()和帶CompletionHander監聽處理器的accept(),read()方法。並分別重寫了 accept() 和read()方法需要的CompletionHander監聽處理器
另外再準備客戶端下一次連結的地方,會巢狀監聽埠,即accept()連結成功後會觸發CompletionHander,而CompletionHander中又開啟了監聽下一次。若再有客戶端接入,將會再次觸發CompletionHander。
public class AIOServer
{
static final int PORT = 30000;
final static String UTF_8 = "utf-8";
static List<AsynchronousSocketChannel> channelList
= new ArrayList<>();
public void startListen() throws InterruptedException,
Exception
{
// 建立一個執行緒池
ExecutorService executor = Executors.newFixedThreadPool(20);
// 以指定執行緒池來建立一個AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
.withThreadPool(executor);
// 以指定執行緒池來建立一個AsynchronousServerSocketChannel
AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open(channelGroup)
// 指定監聽本機的PORT埠
.bind(new InetSocketAddress(PORT));
// 使用CompletionHandler接受來自客戶端的連線請求
serverChannel.accept(null, new AcceptHandler(serverChannel)); // ①
Thread.sleep(100000);
}
public static void main(String[] args)
throws Exception
{
AIOServer server = new AIOServer();
server.startListen();
}
}
// 實現自己的CompletionHandler類
class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel, Object>
{
private AsynchronousServerSocketChannel serverChannel;
public AcceptHandler(AsynchronousServerSocketChannel sc)
{
this.serverChannel = sc;
}
// 定義一個ByteBuffer準備讀取資料
ByteBuffer buff = ByteBuffer.allocate(1024);
// 當實際IO操作完成時候觸發該方法
@Override
public void completed(final AsynchronousSocketChannel sc
, Object attachment)
{
// 記錄新連線的進來的Channel
AIOServer.channelList.add(sc);
// 準備接受客戶端的下一次連線
serverChannel.accept(null , this);
sc.read(buff , null
, new CompletionHandler<Integer,Object>() // ②
{
@Override
public void completed(Integer result
, Object attachment)
{
buff.flip();
// 將buff中內容轉換為字串
String content = StandardCharsets.UTF_8
.decode(buff).toString();
// 遍歷每個Channel,將收到的資訊寫入各Channel中
for(AsynchronousSocketChannel c : AIOServer.channelList)
{
try
{
c.write(ByteBuffer.wrap(content.getBytes(
AIOServer.UTF_8))).get();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
buff.clear();
// 讀取下一次資料
sc.read(buff , null , this);
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("讀取資料失敗: " + ex);
// 從該Channel讀取資料失敗,就將該Channel刪除
AIOServer.channelList.remove(sc);
}
});
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("連線失敗: " + ex);
}
}
客戶端程式同理(該程式有圖形介面)
public class AIOClient
{
final static String UTF_8 = "utf-8";
final static int PORT = 30000;
// 與伺服器端通訊的非同步Channel
AsynchronousSocketChannel clientChannel;
JFrame mainWin = new JFrame("多人聊天");
JTextArea jta = new JTextArea(16 , 48);
JTextField jtf = new JTextField(40);
JButton sendBn = new JButton("傳送");
public void init()
{
mainWin.setLayout(new BorderLayout());
jta.setEditable(false);
mainWin.add(new JScrollPane(jta), BorderLayout.CENTER);
JPanel jp = new JPanel();
jp.add(jtf);
jp.add(sendBn);
// 傳送訊息的Action,Action是ActionListener的子介面
Action sendAction = new AbstractAction()
{
public void actionPerformed(ActionEvent e)
{
String content = jtf.getText();
if (content.trim().length() > 0)
{
try
{
// 將content內容寫入Channel中
clientChannel.write(ByteBuffer.wrap(content
.trim().getBytes(UTF_8))).get(); //①
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
// 清空輸入框
jtf.setText("");
}
};
sendBn.addActionListener(sendAction);
// 將Ctrl+Enter鍵和"send"關聯
jtf.getInputMap().put(KeyStroke.getKeyStroke('\n'
, java.awt.event.InputEvent.CTRL_DOWN_MASK) , "send");
// 將"send"和sendAction關聯
jtf.getActionMap().put("send", sendAction);
mainWin.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
mainWin.add(jp , BorderLayout.SOUTH);
mainWin.pack();
mainWin.setVisible(true);
}
public void connect()
throws Exception
{
// 定義一個ByteBuffer準備讀取資料
final ByteBuffer buff = ByteBuffer.allocate(1024);
// 建立一個執行緒池
ExecutorService executor = Executors.newFixedThreadPool(80);
// 以指定執行緒池來建立一個AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup =
AsynchronousChannelGroup.withThreadPool(executor);
// 以channelGroup作為組管理器來建立AsynchronousSocketChannel
clientChannel = AsynchronousSocketChannel.open(channelGroup);
// 讓AsynchronousSocketChannel連線到指定IP、指定埠
clientChannel.connect(new InetSocketAddress("127.0.0.1"
, PORT)).get();
jta.append("---與伺服器連線成功---\n");
buff.clear();
clientChannel.read(buff, null
, new CompletionHandler<Integer,Object>() //②
{
@Override
public void completed(Integer result, Object attachment)
{
buff.flip();
// 將buff中內容轉換為字串
String content = StandardCharsets.UTF_8
.decode(buff).toString();
// 顯示從伺服器端讀取的資料
jta.append("某人說:" + content + "\n");
buff.clear();
clientChannel.read(buff , null , this);
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("讀取資料失敗: " + ex);
}
});
}
public static void main(String[] args)
throws Exception
{
AIOClient client = new AIOClient();
client.init();
client.connect();
}
}