1. 程式人生 > >Reactor模型-多執行緒程版

Reactor模型-多執行緒程版

1.概述

Reactor單執行緒版本的設計中,I/O任務乃至業務邏輯都由Reactor執行緒來完成,這無疑增加了Reactor執行緒的負擔,高負載情況下必然會出現效能瓶頸。此外,對於多處理器的伺服器來說,單個Reactor執行緒也發揮不了多CPU的最大功效。下面我們對之前單執行緒版的Reactor進行改進。

改進方向
  1. 接受客戶端連線請求的不在是單個執行緒-Acceptor,而是一個NIO執行緒池。
  2. I/O處理也不再是單個執行緒處理,而是交給一個I/O執行緒池進行處理。

其實改進方向很明確:就是針對可能的系統瓶頸,由單執行緒改進為多執行緒處理。這樣的方案帶來的好處顯而易見,增加可靠性的同時也發揮多執行緒的優勢,在高負載的情況下能夠從容應對。

Key Word

Java NIO 事件驅動 主從Reactor模型


2.code未動,test先行

首先定義服務端用於處理請求的Handler,通過實現ChannelHandler介面完成。

public class SimpleServerChannelHandler implements ChannelHandler { private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class); //記錄接受訊息的次數 public volatile int receiveSize; //記錄丟擲的異常 public volatile Throwable t; @Override public void channelActive(NioChannel channel) { if(LOG.isDebugEnabled()){ LOG.debug("ChannelActive"); } } @Override public void channelRead(NioChannel channel, Object msg) throws Exception { ByteBuffer bb = (ByteBuffer)msg; byte[] con = new byte[bb.remaining()]; bb.get(con); String str = new String(con,0,con.length); String resp = ""; switch(str){ case "request1":resp = "response1";break; case "request2":resp = "response2";break; case "request3":resp = "response3";break; default :resp = "Hello Client"; } ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length); buf.put(resp.getBytes()); receiveSize++; channel.sendBuffer(buf); } @Override public void exceptionCaught(NioChannel channel, Throwable t) throws Exception { this.t = t; channel.close(); } } 

Junit測試用例,setUp用於啟動Server端和Client端。

public class ReactorTest extends BaseTest{ private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class); private static String HOST = "localhost"; private static int PORT = 8888; private static Client client; private static Server server; static SimpleServerChannelHandler h; @BeforeClass public static void setUp() throws Exception { startServer(); startClient(); } private static void startServer() throws Exception{ server = new Server(); ReactorPool mainReactor = new ReactorPool(); ReactorPool subReactor = new ReactorPool(); h = new SimpleServerChannelHandler(); server.reactor(mainReactor, subReactor) .handler(h) .bind(new InetSocketAddress(HOST,PORT)); } private static void startClient() throws SocketException{ client = new Client(); client.socket().setTcpNoDelay(true); client.connect( new InetSocketAddress(HOST,PORT)); } @Test public void test() { LOG.info("Sucessful configuration"); } @Test public void testBaseFunction(){ LOG.debug("testBaseFunction()"); String msg ="Hello Reactor"; ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes())); byte[] res = new byte[resp.remaining()]; resp.get(res); Assert.assertEquals("Hello Client", new String(res,0,res.length)); } @Test public void testMultiSend(){ int sendSize = 1024; for(int i = 0; i < sendSize; i++){ ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes()); ByteBuffer resp = client.syncSend(bb); byte[] res = new byte[resp.remaining()]; resp.get(res); Assert.assertEquals("Hello Client", new String(res,0,res.length)); } Assert.assertEquals(sendSize, h.receiveSize); } @Test public void testTooLongReceivedByteSizeEexception(){ LOG.debug("testTooLongReceivedByteSizeEexception()"); int threshold = 1024; byte[] dest = new byte[threshold + 1]; Random r = new Random(); r.nextBytes(dest); client.syncSend(ByteBuffer.wrap(dest)); Assert.assertEquals(IllegalArgumentException.class, h.t.getClass()); Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage()); } @AfterClass public static void tearDown() throws Exception { server.close(); client.close(); } } 

一共進行三項基本測試:

testBaseFunction

實現了基本傳送接收訊息的功能。

testMultiSend

重複傳送訊息,並且記錄訊息收發的次數。

testTooLongReceivedByteSizeEexception

測試server端在接收到異常碼流的情況下,是否丟擲異常。

3.設計及實現

3.1 Reactor和ReactorPool

Reactor作用就是不斷進行輪詢並檢查是否有已經就緒的事件,如果有,那麼就將事件分發給對應的Handler進行處理。這個角色其實就是NIO程式設計中的多路複用器java.nio.channels.Selector。因此,Reactor聚合一個Selector型別成員變數。輪詢的過程如下:

public class Reactor extends Thread{ //... private Selector selector; private volatile boolean isShutdown; Reactor(){ try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("failed to open a new selector", e); } } @Override public void run() { for(;;){ try { getSelector().select(wakenUp); Set<SelectionKey> keys; synchronized(this){ keys = getSelector().selectedKeys(); } Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()){ SelectionKey key = it.next(); processSelectedKey(key); it.remove(); } if(isShutdown()){ break; } } catch (Throwable e) { LOG.warn("Unexpected exception in the selector loop.", e); try { Thread.sleep(1000); } catch (InterruptedException e1) { } } } } } 

processSelectedKey(key)中進行的就是根據就緒事件key.readyOps()進行相應操作:

    private void processSelectedKey(SelectionKey key){ try { NioChannel nioChannel = (NioChannel)key.attachment(); if (!nioChannel.isOpen()) { LOG.warn("trying to do i/o on a null socket"); return; } int readyOps = key.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { nioChannel.sink().doRead(); } if((readyOps & SelectionKey.OP_WRITE) != 0){ nioChannel.sink().doSend(); } if((readyOps & SelectionKey.OP_CONNECT) != 0){ //remove OP_CONNECT key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT)); } }catch (Throwable t) { if (LOG.isDebugEnabled()) { LOG.debug("Throwable stack trace", t); } closeSocket(); } } 

這裡的NioChannel是抽象類,是對NIO程式設計中的Channel語義的抽象(後面會有分析)。

此外,Reactor肯定要提供一個註冊介面啦。。。

    public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){ if(sc == null){ throw new NullPointerException("SelectableChannel"); } if(interestOps == 0){ throw new IllegalArgumentException("interestOps must be non-zero."); } SelectionKey key; try { key = sc.channel().register(getSelector(), interestOps, attachment); } catch (ClosedChannelException e) { throw new RuntimeException("failed to register a channel", e); } return key; } 

ReactorPool是一個Reactor的執行緒池,這裡就通過簡單的陣列形式進行模擬:

public class ReactorPool {

    private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class); private Reactor[] reactors; private AtomicInteger index = new AtomicInteger(); //執行緒數預設為CPU數*2 private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2; public ReactorPool (){ this(0); } public ReactorPool(int nThreads){ if(nThreads < 0){ throw new IllegalArgumentException("nThreads must be nonnegative number"); } if(nThreads == 0){ nThreads = DEFAULT_THREADS; } reactors = new Reactor[nThreads]; for(int i = 0; i < nThreads; i++){ boolean succeed = false; try{ reactors[i] = new Reactor(); succeed = true; }catch(Exception e){ throw new IllegalStateException("failed to create a Reactor", e); }finally{ if (!succeed) { for (int j = 0; j < i; j ++) { reactors[j].close(); } } } } } public Reactor next(){ return reactors[index.incrementAndGet() % reactors.length]; } public void close(){ for(int i = 0; i < reactors.length; i++){ reactors[i].setShutdown(true); reactors[i].close(); } } } 

3.2 NioChannel和NioChannelSink

在進行Java原生Nio程式設計的過程中,會涉及到兩種型別的Channel:

  • java.nio.channels.SocketChannel
  • java.nio.channels.ServerSocketChannel

其分別作為客戶端和服務端呼叫介面。為了統一其公共行為,這裡抽象出一個抽象類NioChannel,其成員組成如下:

  • 聚合一個SelectableChannel型別(SocketChannel和ServerSocketChannel的公共父類)的成員變數。
  • 持有一個所屬Reactor物件的引用
  • 聚合一個NioChannelSink型別成員變數。

NioChannelSink是將NioChannel的底層讀寫功能獨立出來。一方面使NioChannel避免整合過多功能而顯得臃腫,另一方面分離出底層傳輸協議,為以後底層傳輸協議的切換做準備。(TCP vs UDP,NIO、OIO、AIO)從這種意義上說,NioChannel取名為Channel貌似更合理。

public abstract class NioChannel { protected Reactor reactor; protected SelectableChannel sc; protected SelectionKey selectionKey; private NioChannelSink sink; protected volatile ChannelHandler handler; public NioChannel(SelectableChannel sc, int interestOps){ this.sc = sc; try { sc.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } sink = nioChannelSink(); } protected void fireChannelRead(ByteBuffer bb){ try { handler.channelRead(this, bb); } catch (Exception e) { fireExceptionCaught(e); } } protected void fireExceptionCaught(Throwable t){ try { handler.exceptionCaught(this, t); } catch (Exception e) { e.printStackTrace(); } } //。。。 public abstract NioChannelSink nioChannelSink(); public interface NioChannelSink{ void doRead(); void doSend(); void sendBuffer(ByteBuffer bb); void close(); } } 

再來分析下NioChannel需要提供哪些功能:

首先,NIO程式設計中SocketChannel或ServerSocketChannel需要註冊到多路複用器Selector中。那麼這裡就抽象成了NioChannel和Reactor的互動。

public void register(Reactor reactor, int interestOps){ this.reactor = reactor; try { selectionKey = sc.register(reactor().getSelector(), interestOps, this); } catch (ClosedChannelException e) { e.printStackTrace(); } } 

這裡將NioChannel物件作為附件,在Reactor中心輪詢到ready事件後,會根據事件的型別(OP_ACCEPT OP_READ等),從SelectionKey中取出繫結的附件NioChannel

NioChannel nioChannel = (NioChannel)key.attachment();

然後根據進行key.readyOps()做相應操作。這在Reactor中已經做過分析。

其次,作為Channel肯定要提供繫結bind和連線connect的功能了:

public abstract void bind(InetSocketAddress remoteAddress) throws Exception; public abstract void connect(InetSocketAddress remoteAddress) throws Exception; 

這裡用抽象方法是要將實現交由子類來完成。

最後,是使用者通過NioChannel傳送的訊息的函式:

public void sendBuffer(ByteBuffer bb){ sink().sendBuffer(bb); } protected final void enableWrite(){ int i = selectionKey.interestOps(); if((i & SelectionKey.OP_WRITE) == 0){ selectionKey.interestOps(i | SelectionKey.OP_WRITE); } } protected final void disableWrite(){ int i = selectionKey.interestOps(); if((i & SelectionKey.OP_WRITE) == 1){ selectionKey.interestOps(i & (~SelectionKey.OP_WRITE)); } } 

3.3 NioServerSocketChannel和NioSocketChannel

NioServerSocketChannel和NioSocketChannel是抽象類NioChannel的一個子類,NioServerSocketChannel和java.nio.channels.ServerSocketChannel的語義是一致的,供服務端使用,繫結指定埠,監聽客戶端發起的連線請求,並交由相應Handler處理。而NioSocketChannel和java.nio.channels.NioSocketChannel語義一致,作為通訊的一個通道。

public class NioServerSocketChannel extends NioChannel{ private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class); public NioServerSocketChannel(){ super(newSocket()); } public static ServerSocketChannel newSocket(){ ServerSocketChannel socketChannel = null; try { socketChannel = ServerSocketChannel.open(); } catch (IOException e) { LOG.error("Unexpected exception occur when open ServerSocketChannel"); } return socketChannel; } @Override public NioChannelSink nioChannelSink() { return new NioServerSocketChannelSink(); } class NioServerSocketChannelSink implements NioChannelSink{ //。。。 } @Override public void bind(InetSocketAddress remoteAddress) throws Exception { ServerSocketChannel ssc = (ServerSocketChannel)sc; ssc.bind(remoteAddress); } @Override public void connect(InetSocketAddress remoteAddress) throws Exception { throw new UnsupportedOperationException(); } } 

這裡獲取ServerSocketChannel例項的方式是通過ServerSocketChannel.open(),其實也可以通過反射來獲取,這樣就能將ServerSocketChannel和SocketChannel的例項化邏輯進行統一,我們只需要在例項化Channel的時候將ServerSocketChannel.class 或 SocketChannel.class當作引數傳入即可。

NioSocketChannel的實現如下:

public class NioSocketChannel extends NioChannel{ private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class); public NioSocketChannel() throws IOException{ super( newSocket()); } public NioSocketChannel(SocketChannel sc) throws IOException{ super(sc); } public static SocketChannel newSocket(){ SocketChannel socketChannel = null; try { socketChannel = SocketChannel.open(); } catch (IOException e) { } return socketChannel; } @Override public NioChannelSink nioChannelSink() { return new NioSocketChannelSink(); } class NioSocketChannelSink implements NioChannelSink{ //。。。 } @Override public void bind(InetSocketAddress remoteAddress) throws Exception { throw new UnsupportedOperationException(); } @Override public void connect(InetSocketAddress remoteAddress) throws Exception { SocketChannel socketChannel = (SocketChannel)sc; socketChannel.connect(remoteAddress); } } 

3.4 NioServerSocketChannelSink和NioSocketChannelSink

通過上面分析可知,NioChannel的只向上提供了操作介面,而具體的底層讀寫等功能全部代理給了NioChannelSink完成。接下來分析下NioChannelSink的兩個子類NioServerSocketChannelSink和NioSocketChannelSink。

首先再看下NioChannelSink的介面:

    public interface NioChannelSink{

        void doRead(); void doSend(); void sendBuffer(ByteBuffer bb); void close(); } 

對於NioChannelSink的兩個實現類來說,每個方法所對應的語義如下:

doRead()

  • NioServerSocketChannelSink:通過accept()接受客戶端的請求。
  • NioSocketChannelSink:讀取NioChannel中的資料

doSend()

  • NioServerSocketChannelSink:不支援。
  • NioSocketChannelSink:將緩衝區中資料寫入NioChannel

sendBuffer()

  • NioServerSocketChannelSink:不支援。
  • NioSocketChannelSink:傳送資料,其實就是將待發送資料加入緩衝佇列中。

close()

  • NioServerSocketChannelSink:關閉Channel。
  • NioSocketChannelSink:同上。

當然了,作為網路程式設計中的Channel所提供的功能原比這裡要多且複雜,作為學習Demo,這裡只實現了最常用的幾個功能。

下面看下NioServerSocketChannelSink的實現:

public class NioServerSocketChannel extends NioChannel{ //。。。 class NioServerSocketChannelSink implements NioChannelSink{ public void doRead() { try { ServerSocketChannel ssc = (ServerSocketChannel)sc; handler.channelRead(NioServerSocketChannel.this, new NioSocketChannel(ssc.accept())); if(LOG.isDebugEnabled()){ LOG.debug("Dispatch the SocketChannel to SubReactorPool"); } } catch (Exception e1) { e1.printStackTrace(); } } public void doSend(){ throw new UnsupportedOperationException(); } @Override public void sendBuffer(ByteBuffer bb) { throw new UnsupportedOperationException(); } @Override public void close() { try { if(sc != null){ sc.close(); } } catch (IOException e) { e.printStackTrace(); } } }// end NioChannelSink //。。。 } 

下面是NioSocketChannelSink實現:

public class NioSocketChannel extends NioChannel{ //。。。 class NioSocketChannelSink implements NioChannelSink{ private static final int MAX_LEN = 1024; ByteBuffer lenBuffer = ByteBuffer.allocate(4); ByteBuffer inputBuffer = lenBuffer; ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64); LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>(); public void close(){ //clear buffer outputDirectBuffer = null; try { if(sc != null){ sc.close(); } } catch (IOException e) { e.printStackTrace(); } } public void doRead() { SocketChannel socketChannel = (SocketChannel)sc; int byteSize; try { byteSize = socketChannel.read(inputBuffer); if(byteSize < 0){ LOG.error("Unable to read additional data"); throw new RuntimeException("Unable to read additional data"); } if(!inputBuffer.hasRemaining()){ if(inputBuffer == lenBuffer){ //read length lenBuffer.flip(); int len = lenBuffer.getInt(); if(len < 0 || len > MAX_LEN){ throw new IllegalArgumentException("Illegal data length, len:" + len); } //prepare for receiving data inputBuffer = ByteBuffer.allocate(len); inputBuffer.clear(); }else{ //read data if(inputBuffer.hasRemaining()){ socketChannel.read(inputBuffer); } if(!inputBuffer.hasRemaining()){ inputBuffer.flip(); fireChannelRead(inputBuffer); //clear lenBuffer and waiting for next reading operation lenBuffer.clear(); inputBuffer = lenBuffer; } } } } catch (Throwable t) { if(LOG.isDebugEnabled()){ LOG.debug("Exception :" + t); } fireExceptionCaught(t); } } public void doSend(){ /** * write data to channel: * step 1: write the length of data(occupy 4 byte) * step 2: data content */ try { if(outputQueue.size() > 0){ ByteBuffer directBuffer = outputDirectBuffer; directBuffer.clear(); for(ByteBuffer buf : outputQueue){ buf.flip(); if(buf.remaining() > directBuffer.remaining()){ //prevent BufferOverflowException buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining()); } //transfers the bytes remaining in buf into directBuffer int p = buf.position(); directBuffer.put(buf); //reset position buf.position(p); if(!directBuffer.hasRemaining()){ break; } } directBuffer.flip(); int sendSize = ((SocketChannel)sc).write(directBuffer); while(!outputQueue.isEmpty()){ ByteBuffer buf = outputQueue.peek(); int left = buf.remaining() - sendSize; if(left > 0){ buf.position(buf.position() + sendSize); break; } sendSize -= buf.remaining(); outputQueue.remove(); } } synchronized(reactor){ if(outputQueue.size() == 0){ //disable write disableWrite(); }else{ //enable write enableWrite(); } } } catch (Throwable t) { fireExceptionCaught(t); } } private ByteBuffer wrapWithHead(ByteBuffer bb){ bb.flip(); lenBuffer.clear(); int len = bb.remaining(); lenBuffer.putInt(len); ByteBuffer resp = ByteBuffer.allocate(len+4); lenBuffer.flip(); resp.put(lenBuffer); resp.put(bb); return resp; } public void sendBuffer(ByteBuffer bb){ try{ synchronized(this){ //wrap ByteBuffer with length header ByteBuffer wrapped = wrapWithHead(bb); outputQueue.add(wrapped); enableWrite(); } }catch(Exception e){ LOG.error("Unexcepted Exception: ", e); } } }// end NioSocketChannelSink //。。。 } 

NioSocketChannelSink中的讀寫功能在Reactor單執行緒版本里已經分析過,這裡就不再贅述。

3.5 ChannelHandler

ChannelHandler是Reactor框架提供給使用者進行自定義的介面。介面提供了常用的介面:

public interface ChannelHandler {
    
    void channelActive(NioChannel channel); void channelRead(NioChannel channel, Object msg) throws Exception; void exceptionCaught(NioChannel channel, Throwable t) throws Exception; } 

4. 總結

4.1 軟體設計中的一些注意點

時刻緊繃一根弦:資源是有限的

比如在網路程式設計中,每建立一個Socket連線都會消耗一定資源,當回話結束後一定要關閉。此外,必須考慮非正常流程時的情況。比如發生異常,可能執行不到關閉資源的操作。 如ReactorPool的例項化過程:

    public ReactorPool(int nThreads){ //。。 reactors = new Reactor[nThreads]; for(int i = 0; i < nThreads; i++){ boolean succeed = false; try{ reactors[i] = new Reactor(); succeed = true; }catch(Exception e){ throw new IllegalStateException("failed to create a Reactor", e); }finally{ if (!succeed) { for (int j = 0; j < i; j ++) { reactors[j].close(); } } } } } 

當例項化過程中傳送異常時,記得要及時回收已佔用資源。

又比如在通訊一端接受位元組流的時候需要注意對異常碼流的處理,避免碼流過大而耗盡記憶體,導致OOM。

併發操作分析

  • 這個類是執行緒安全的嗎?
  • 這個方法是在哪個執行緒中執行的?
  • 是否是熱點區域?
  • 是否存在併發修改的可能?
  • 併發修改是否可見?

在單執行緒版的Reactor模型中,所有的邏輯都由Reactor單個執行緒執行,不存在多執行緒併發操作的情況,那麼在我們添加了執行緒池workerPool後,情況又會怎麼樣呢?

一般我們在分析併發性問題,通常的做法是先找到可能被多個執行緒共同訪問的類,再分析下這個類是否是執行緒安全的。如何判斷某個類是否是執行緒安全的?

  1. 該類是否是有狀態的,無狀態的類一定是執行緒安全的。
  2. 如果有狀態,是否可變。如果一個類狀態不可變,那麼肯定也是執行緒安全的。

所謂的狀態暫可以簡單理解為是否有成員變數,不管是靜態成員變數還是普通成員變數。

關於"單一職責"

單一職責原則是面向物件軟體設計的基本原則之一,難點在於介面的職責如何劃分,而職責的劃分又需要具體問題具體考慮。拿本次這個小Demo來說,NioChannel的職責是作為資料傳輸通道,而通道中資料傳輸方式可能有很多種,那麼這裡就抽象出一個NioChannelSink介面負責具體傳輸方式的實現。

職責粒度的劃分需要根據需求好好把控。過粗不利於擴充套件,過細不利於實現。



作者:TopGun_Viper
連結:https://www.jianshu.com/p/847600114337
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。