Day14.高效能RPC設計 學習筆記2
阿新 • • 發佈:2018-12-27
一、通道選擇器
通道註冊
:需要使用Selector管理通道,然後將就緒的通道封裝成SelectionKey物件。
- 設定通道為非阻塞 ServerSocketChannel/SocketChannel#configureBlocking(false)
- 註冊通道ServerSocketChannel/SocketChannel#register(selector,事件型別[,附件資訊])
NIO的網路程式設計的思想是基於非同步事件處理,底層通過Selector去管理註冊列表,一旦註冊列表的相關通道就緒,selector就會將就緒的通道放置在事件處理佇列中,使用者可以通過Selector#selectedKeys()
所有就緒的key只能被處理一次,因此使用者必須在處理完事件key後,將該事件在事件處理列表中移除。
事件取消註冊
:通道關閉
,key#cancel()
ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9999));
//設定通道非阻塞
ssc.configureBlocking(false);
//建立通道選擇器
Selector selector=Selector.open();
//註冊ACCEPT事件型別 轉發
ssc.register( selector,SelectionKey.OP_ACCEPT);
//迭代遍歷事件key
while(true){
//返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup)
int num = selector.select();
if(num >0){
//事件處理
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
//處理對應的事件key
if(key.isAcceptable()){
//處理轉發事件
ServerSocketChannel channel= (ServerSocketChannel) key.channel();
SocketChannel s=channel.accept();//立即返回一個不為null的SocketChannel
s.configureBlocking(false);//註冊讀
s.register(selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
//處理讀事件
SocketChannel s= (SocketChannel) key.channel();
//處理讀
...
//註冊寫
s.register(selector,SelectionKey.OP_WRITE[,請求引數]);
}else if(key.isWritable()){
//處理寫事件
SocketChannel s= (SocketChannel) key.channel();
//根據請求引數給出響應
...
s.shutdownOutput();//告知寫結束
s.close();
}
//移除key
keys.remove();
}
}
}
二、NIO單執行緒版本
public class NIOBootstrapServer {
public static void main(String[] args) throws IOException {
//1、建立ServerSocket
ServerSocketChannel ssc=ServerSocketChannel.open();
//2、繫結監聽埠
ssc.bind(new InetSocketAddress(9999));
//3、設定通道非阻塞
ssc.configureBlocking(false);
//4、建立通道選擇器
Selector selector= Selector.open(); //nio多是open來註冊東西
//5、註冊ACCEPT事件型別 轉發
ssc.register(selector,SelectionKey.OP_ACCEPT);
//迭代遍歷事件key
while(true){
//返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup)
System.out.println("嘗試選擇待處理的keys...");
//可以出來keys的數目,如果沒有該方法block
int num = selector.select();
if(num >0){
//事件處理
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
//處理對應的事件key
if(key.isAcceptable()){
System.out.println("處理轉發同時註冊讀...");
//處理轉發事件
ServerSocketChannel channel= (ServerSocketChannel) key.channel();
SocketChannel s=channel.accept();//立即返回一個不為null的SocketChannel
s.configureBlocking(false);//設定非阻塞
//註冊讀
s.register(selector,SelectionKey.OP_READ,new ByteArrayOutputStream());
}else if(key.isReadable()){
System.out.println("處理讀...");
//處理讀事件
SocketChannel s= (SocketChannel) key.channel(); //拿到註冊過的SocketChannel
//處理讀
ByteBuffer buffer=ByteBuffer.allocate(1024);
ByteArrayOutputStream baos= (ByteArrayOutputStream) key.attachment();
//一次嘗試讀取一個緩衝區
int n=s.read(buffer);
if(n==-1){
System.out.println("伺服器收到:"+new String(baos.toByteArray()));
//根據請求引數給出響應
ByteArrayInputStream bais=new ByteArrayInputStream((new Date().toLocaleString()).getBytes());
//註冊寫
s.register(selector, SelectionKey.OP_WRITE,bais);
}else{
buffer.flip();
baos.write(buffer.array(),0,n);
}
}else if(key.isWritable()){
System.out.println("處理寫...");
//處理寫事件
SocketChannel s= (SocketChannel) key.channel();
ByteArrayInputStream bais = (ByteArrayInputStream)key.attachment();
byte[] bytes=new byte[1024];
int n = bais.read(bytes);
if(n==-1){
s.shutdownOutput();//告知寫結束
s.close(); //關閉通道
}else{
s.write(ByteBuffer.wrap(bytes,0,n));
}
}
//移除key
keys.remove();
}
}
}
}
}
三、NIO多執行緒版【瞭解】
依賴
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
完整實現如下
public class NIOBootstrapServerPool {
//該執行緒池主要負責請求的轉發
private static ExecutorService master= Executors.newFixedThreadPool(66);
//該執行緒池主要負責請求的響應
private static ExecutorService worker= Executors.newFixedThreadPool(66);
//註冊轉發佇列
private static final AtomicBoolean NEED_REG_DISPATH= new AtomicBoolean(false);
//註冊讀佇列
private static final List<ChannelAndAtt> READ_QUEUE= new Vector<ChannelAndAtt>();
//註冊寫佇列
private static final CopyOnWriteArrayList<ChannelAndAtt> WRITE_QUEUE= new CopyOnWriteArrayList<ChannelAndAtt>();
public static void main(String[] args) throws IOException {
//建立serverSocket
ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9999));
//設定通道非阻塞
ssc.configureBlocking(false);
//建立通道選擇器selector
Selector selector= Selector.open();
//註冊ACCEPT事件型別 轉發
ssc.register(selector,SelectionKey.OP_ACCEPT);
//迭代遍歷事件key
while(true){
//返回需要處理的事件個數,如果沒有該方法會阻塞,也有可能直接返回0(當程式呼叫Selector#wakeup)
// System.out.println("嘗試選擇待處理的keys...");
int num = selector.select(1);
if(num >0){
//事件處理
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
//處理對應的事件key
if(key.isAcceptable()){ //請求轉發
key.cancel();//取消轉發註冊
master.submit(new ProcessDispatcher(key,selector));
}else if(key.isReadable()){ //讀取IO處理
key.cancel();//取消讀註冊
worker.submit(new ProcessRead(key,selector));
}else if(key.isWritable()){ //響應IO處理
key.cancel();//取消寫註冊
worker.submit(new ProcessWrite(key,selector));
}
//刪除當前事件key,刪除並不意味著取消註冊
keys.remove();
}
}else {
if(NEED_REG_DISPATH.get()){//需要重新註冊ACCEPT
System.out.println("重新註冊ACCEPT");
ssc.register(selector,SelectionKey.OP_ACCEPT);
NEED_REG_DISPATH.set(false);
}
while(READ_QUEUE.size()>0){
ChannelAndAtt channelAndAtt = READ_QUEUE.remove(0);
//註冊讀
System.out.println("註冊READ");
channelAndAtt.getChannel().register(selector,SelectionKey.OP_READ,channelAndAtt.att);
}
while(WRITE_QUEUE.size()>0){
ChannelAndAtt channelAndAtt = WRITE_QUEUE.remove(0);
//註冊寫
System.out.println("註冊寫");
channelAndAtt.getChannel().register(selector,SelectionKey.OP_WRITE,channelAndAtt.att);
}
}
}
}
/**
* 處理請求寫
*/
public static class ProcessWrite implements Runnable{
private SelectionKey key;
private Selector selector;
public ProcessWrite(SelectionKey key, Selector selector) {
this.key = key;
this.selector = selector;
}
@Override
public void run() {
try {
SocketChannel s= (SocketChannel) key.channel();
ByteArrayInputStream bais = (ByteArrayInputStream)key.attachment();
byte[] bytes=new byte[1024];
int n = bais.read(bytes);//最多從bais獲取一個緩衝區的資料
if(n==-1){
s.shutdownOutput();//告知寫結束
s.close(); //關閉通道
}else{
//最多寫一個緩衝區的資料
s.write(ByteBuffer.wrap(bytes,0,n));
//恢復寫註冊
WRITE_QUEUE.add(new ChannelAndAtt(s,bais));
}
//打破main執行緒
selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 處理請求讀
*/
public static class ProcessRead implements Runnable{
private SelectionKey key;
private Selector selector;
public ProcessRead(SelectionKey key, Selector selector) {
this.key = key;
this.selector = selector;
}
@Override
public void run() {
try {
//處理讀事件
SocketChannel s= (SocketChannel) key.channel();
//處理讀
ByteBuffer buffer=ByteBuffer.allocate(1024);
ByteArrayOutputStream baos= (ByteArrayOutputStream) key.attachment();
int n=s.read(buffer);
if(n==-1){
//根據請求引數給出響應
Object req= SerializationUtils.deserialize(baos.toByteArray());
System.out.println("伺服器收到:"+req);
ByteArrayInputStream bais=new ByteArrayInputStream(SerializationUtils.serialize(new Date()));
//註冊寫
WRITE_QUEUE.add(new ChannelAndAtt(s,bais));
}else{
buffer.flip();
baos.write(buffer.array(),0,n);
//恢復讀註冊
READ_QUEUE.add(new ChannelAndAtt(s,baos));
}
//打斷mian執行緒阻塞
selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 處理請求轉發
*/
public static class ProcessDispatcher implements Runnable{
private SelectionKey key;
private Selector selector;
public ProcessDispatcher(SelectionKey key, Selector selector) {
this.key = key;
this.selector = selector;
}
@Override
public void run() {
try {
//獲取通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel s = ssc.accept();
s.configureBlocking(false);
/