Thinkphp6使用騰訊雲傳送簡訊步驟 Thinkphp6使用騰訊雲傳送簡訊步驟
阿新 • • 發佈:2022-03-11
想學Netty,所以先學下一直也沒開始學的NIO。
傳統的同步阻塞模型
下面是類似Java的虛擬碼編寫的基於Java傳統BIO的socket服務程式:
while(true) {
sock = serverSocket.accept(); // 請求到來前阻塞
ins = sock.getInputStream();
ins.read(buf); // 當有資料讀取之前阻塞
result = handle(buf);
sock.getOutputStream().write(result); // 可寫之前,阻塞
}
- 當呼叫
accept
時,產生阻塞,在請求到來之前,當前執行緒啥都不能再幹了,只能乾等著 - 當呼叫
read
時,如果流中並沒有可讀資料也會阻塞
- 呼叫
write
時,如果寫緩衝滿了,該方法也會阻塞
對於第三點,可以這樣測試,不想看程式碼直接忽略即可。
public class ClientBIO { public static void main(String[] args) throws IOException, InterruptedException { Socket socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 11441)); // 迴圈寫入大量資料,佔滿緩衝區 for (int i=0;i < 100; i++) { socket.getOutputStream().write(new byte[1024 * 1024]); System.out.println("send"+i); } Thread.sleep(10000); socket.close(); } } public class ServerBIO { public static void main(String[] args) throws IOException, InterruptedException { ServerSocket ssock = new ServerSocket(11441); while (true) { Socket sock = ssock.accept(); // 暫停5s,不進行讀取 Thread.sleep(5000); InputStream ins = sock.getInputStream(); byte[] buf = new byte[1024]; long l = 0; while ((l = ins.read(buf)) != -1) { } } } }
在我的機器上,write會在第三次迴圈時被阻塞,稍後當Server開始讀取,程式開始正常運作。
可以看到傳統的BIO在accept、read和write上都會阻塞,其過程如下:
- 發起IO呼叫
- 執行緒阻塞等待IO事件發生
- IO事件發生後的某個時間,之前阻塞的執行緒被喚醒
- 喚醒後繼續執行下面的程式碼
所以傳統的BIO都採用排程執行緒+工作執行緒+執行緒池的方式來排程
while(true) { // dispatcher,不斷接收新請求 sock = server.accept(); // 通過執行緒池開啟一個請求處理器 executor.submit(new RequestHandler(sock)); }
但是執行緒建立銷燬切換成本高,佔用空間大這是一直被詬病的,大家都在尋求更好的辦法。還有個最重要的問題,當執行緒因為外部IO事件阻塞時,這個執行緒就喪失了繼續工作的能力。
NIO模型
NIO是非阻塞IO,它不再阻塞在accept
、read
這種IO操作上,它向系統註冊一類事件,然後當註冊的事件發生時,當前執行緒會接收到事件。
NIO有三種元件
- Buffer
- Channel
- Selector
Buffer就是傳統意義上的位元組緩衝區,Channel代表某種連線的通道,比如Socket,Selector中可以註冊一系列Channel的事件,然後呼叫select
輪詢是否有事件發生。
// 註冊serverSocketChannel,ACCEPT事件
serverSocketChannel.register(selector, OP_ACCEPT);
while(true) {
selector.select(); // 當沒有事件要處理時就阻塞,有就被喚醒
// 判斷事件型別,處理...
}
下面是使用NIO編寫SocketServer的一個示例:
public class ServerNIO {
public static void main(String[] args) {
new ServerNIO().serveForever();
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
System.out.println("accept " + sc.getLocalAddress().toString());
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long br;
while ((br = sc.read(buf)) > 0) {
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char)buf.get());
}
System.out.println();
buf.clear();
}
if (br == -1) Utils.closeClosable(sc);
}
public void serveForever() {
Selector selector = null;
ServerSocketChannel ssc = null;
try {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress("0.0.0.0", 11441));
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("SELECT");
if (selector.select(10000) == 0) continue;
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
}
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
Utils.closeClosable(selector, ssc);
}
}
}