1. 程式人生 > >Reactor模型-多執行緒程版




  1. 接受客戶端連線請求的不在是單個執行緒-Acceptor,而是一個NIO執行緒池。
  2. I/O處理也不再是單個執行緒處理,而是交給一個I/O執行緒池進行處理。


Key Word

Java NIO 事件驅動 主從Reactor模型



public class SimpleServerChannelHandler implements ChannelHandler { private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class); //記錄接受訊息的次數 public volatile int receiveSize; //記錄丟擲的異常 public volatile Throwable t; @Override public void channelActive(NioChannel channel) { if(LOG.isDebugEnabled()){ LOG.debug("ChannelActive"); } } @Override public void channelRead(NioChannel channel, Object msg) throws Exception { ByteBuffer bb = (ByteBuffer)msg; byte[] con = new byte[bb.remaining()]; bb.get(con); String str = new String(con,0,con.length); String resp = ""; switch(str){ case "request1":resp = "response1";break; case "request2":resp = "response2";break; case "request3":resp = "response3";break; default :resp = "Hello Client"; } ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length); buf.put(resp.getBytes()); receiveSize++; channel.sendBuffer(buf); } @Override public void exceptionCaught(NioChannel channel, Throwable t) throws Exception { this.t = t; channel.close(); } } 


public class ReactorTest extends BaseTest{ private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class); private static String HOST = "localhost"; private static int PORT = 8888; private static Client client; private static Server server; static SimpleServerChannelHandler h; @BeforeClass public static void setUp() throws Exception { startServer(); startClient(); } private static void startServer() throws Exception{ server = new Server(); ReactorPool mainReactor = new ReactorPool(); ReactorPool subReactor = new ReactorPool(); h = new SimpleServerChannelHandler(); server.reactor(mainReactor, subReactor) .handler(h) .bind(new InetSocketAddress(HOST,PORT)); } private static void startClient() throws SocketException{ client = new Client(); client.socket().setTcpNoDelay(true); client.connect( new InetSocketAddress(HOST,PORT)); } @Test public void test() { LOG.info("Sucessful configuration"); } @Test public void testBaseFunction(){ LOG.debug("testBaseFunction()"); String msg ="Hello Reactor"; ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes())); byte[] res = new byte[resp.remaining()]; resp.get(res); Assert.assertEquals("Hello Client", new String(res,0,res.length)); } @Test public void testMultiSend(){ int sendSize = 1024; for(int i = 0; i < sendSize; i++){ ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes()); ByteBuffer resp = client.syncSend(bb); byte[] res = new byte[resp.remaining()]; resp.get(res); Assert.assertEquals("Hello Client", new String(res,0,res.length)); } Assert.assertEquals(sendSize, h.receiveSize); } @Test public void testTooLongReceivedByteSizeEexception(){ LOG.debug("testTooLongReceivedByteSizeEexception()"); int threshold = 1024; byte[] dest = new byte[threshold + 1]; Random r = new Random(); r.nextBytes(dest); client.syncSend(ByteBuffer.wrap(dest)); Assert.assertEquals(IllegalArgumentException.class, h.t.getClass()); Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage()); } @AfterClass public static void tearDown() throws Exception { server.close(); client.close(); } } 









3.1 Reactor和ReactorPool


public class Reactor extends Thread{ //... private Selector selector; private volatile boolean isShutdown; Reactor(){ try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("failed to open a new selector", e); } } @Override public void run() { for(;;){ try { getSelector().select(wakenUp); Set<SelectionKey> keys; synchronized(this){ keys = getSelector().selectedKeys(); } Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()){ SelectionKey key = it.next(); processSelectedKey(key); it.remove(); } if(isShutdown()){ break; } } catch (Throwable e) { LOG.warn("Unexpected exception in the selector loop.", e); try { Thread.sleep(1000); } catch (InterruptedException e1) { } } } } } 


    private void processSelectedKey(SelectionKey key){ try { NioChannel nioChannel = (NioChannel)key.attachment(); if (!nioChannel.isOpen()) { LOG.warn("trying to do i/o on a null socket"); return; } int readyOps = key.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { nioChannel.sink().doRead(); } if((readyOps & SelectionKey.OP_WRITE) != 0){ nioChannel.sink().doSend(); } if((readyOps & SelectionKey.OP_CONNECT) != 0){ //remove OP_CONNECT key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT)); } }catch (Throwable t) { if (LOG.isDebugEnabled()) { LOG.debug("Throwable stack trace", t); } closeSocket(); } } 



    public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){ if(sc == null){ throw new NullPointerException("SelectableChannel"); } if(interestOps == 0){ throw new IllegalArgumentException("interestOps must be non-zero."); } SelectionKey key; try { key = sc.channel().register(getSelector(), interestOps, attachment); } catch (ClosedChannelException e) { throw new RuntimeException("failed to register a channel", e); } return key; } 


public class ReactorPool {

    private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class); private Reactor[] reactors; private AtomicInteger index = new AtomicInteger(); //執行緒數預設為CPU數*2 private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2; public ReactorPool (){ this(0); } public ReactorPool(int nThreads){ if(nThreads < 0){ throw new IllegalArgumentException("nThreads must be nonnegative number"); } if(nThreads == 0){ nThreads = DEFAULT_THREADS; } reactors = new Reactor[nThreads]; for(int i = 0; i < nThreads; i++){ boolean succeed = false; try{ reactors[i] = new Reactor(); succeed = true; }catch(Exception e){ throw new IllegalStateException("failed to create a Reactor", e); }finally{ if (!succeed) { for (int j = 0; j < i; j ++) { reactors[j].close(); } } } } } public Reactor next(){ return reactors[index.incrementAndGet() % reactors.length]; } public void close(){ for(int i = 0; i < reactors.length; i++){ reactors[i].setShutdown(true); reactors[i].close(); } } } 

3.2 NioChannel和NioChannelSink


  • java.nio.channels.SocketChannel
  • java.nio.channels.ServerSocketChannel


  • 聚合一個SelectableChannel型別(SocketChannel和ServerSocketChannel的公共父類)的成員變數。
  • 持有一個所屬Reactor物件的引用
  • 聚合一個NioChannelSink型別成員變數。

NioChannelSink是將NioChannel的底層讀寫功能獨立出來。一方面使NioChannel避免整合過多功能而顯得臃腫,另一方面分離出底層傳輸協議,為以後底層傳輸協議的切換做準備。(TCP vs UDP,NIO、OIO、AIO)從這種意義上說,NioChannel取名為Channel貌似更合理。

public abstract class NioChannel { protected Reactor reactor; protected SelectableChannel sc; protected SelectionKey selectionKey; private NioChannelSink sink; protected volatile ChannelHandler handler; public NioChannel(SelectableChannel sc, int interestOps){ this.sc = sc; try { sc.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } sink = nioChannelSink(); } protected void fireChannelRead(ByteBuffer bb){ try { handler.channelRead(this, bb); } catch (Exception e) { fireExceptionCaught(e); } } protected void fireExceptionCaught(Throwable t){ try { handler.exceptionCaught(this, t); } catch (Exception e) { e.printStackTrace(); } } //。。。 public abstract NioChannelSink nioChannelSink(); public interface NioChannelSink{ void doRead(); void doSend(); void sendBuffer(ByteBuffer bb); void close(); } } 



public void register(Reactor reactor, int interestOps){ this.reactor = reactor; try { selectionKey = sc.register(reactor().getSelector(), interestOps, this); } catch (ClosedChannelException e) { e.printStackTrace(); } } 

這裡將NioChannel物件作為附件,在Reactor中心輪詢到ready事件後,會根據事件的型別(OP_ACCEPT OP_READ等),從SelectionKey中取出繫結的附件NioChannel

NioChannel nioChannel = (NioChannel)key.attachment();



public abstract void bind(InetSocketAddress remoteAddress) throws Exception; public abstract void connect(InetSocketAddress remoteAddress) throws Exception; 



public void sendBuffer(ByteBuffer bb){ sink().sendBuffer(bb); } protected final void enableWrite(){ int i = selectionKey.interestOps(); if((i & SelectionKey.OP_WRITE) == 0){ selectionKey.interestOps(i | SelectionKey.OP_WRITE); } } protected final void disableWrite(){ int i = selectionKey.interestOps(); if((i & SelectionKey.OP_WRITE) == 1){ selectionKey.interestOps(i & (~SelectionKey.OP_WRITE)); } } 

3.3 NioServerSocketChannel和NioSocketChannel


public class NioServerSocketChannel extends NioChannel{ private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class); public NioServerSocketChannel(){ super(newSocket()); } public static ServerSocketChannel newSocket(){ ServerSocketChannel socketChannel = null; try { socketChannel = ServerSocketChannel.open(); } catch (IOException e) { LOG.error("Unexpected exception occur when open ServerSocketChannel"); } return socketChannel; } @Override public NioChannelSink nioChannelSink() { return new NioServerSocketChannelSink(); } class NioServerSocketChannelSink implements NioChannelSink{ //。。。 } @Override public void bind(InetSocketAddress remoteAddress) throws Exception { ServerSocketChannel ssc = (ServerSocketChannel)sc; ssc.bind(remoteAddress); } @Override public void connect(InetSocketAddress remoteAddress) throws Exception { throw new UnsupportedOperationException(); } } 

這裡獲取ServerSocketChannel例項的方式是通過ServerSocketChannel.open(),其實也可以通過反射來獲取,這樣就能將ServerSocketChannel和SocketChannel的例項化邏輯進行統一,我們只需要在例項化Channel的時候將ServerSocketChannel.class 或 SocketChannel.class當作引數傳入即可。


public class NioSocketChannel extends NioChannel{ private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class); public NioSocketChannel() throws IOException{ super( newSocket()); } public NioSocketChannel(SocketChannel sc) throws IOException{ super(sc); } public static SocketChannel newSocket(){ SocketChannel socketChannel = null; try { socketChannel = SocketChannel.open(); } catch (IOException e) { } return socketChannel; } @Override public NioChannelSink nioChannelSink() { return new NioSocketChannelSink(); } class NioSocketChannelSink implements NioChannelSink{ //。。。 } @Override public void bind(InetSocketAddress remoteAddress) throws Exception { throw new UnsupportedOperationException(); } @Override public void connect(InetSocketAddress remoteAddress) throws Exception { SocketChannel socketChannel = (SocketChannel)sc; socketChannel.connect(remoteAddress); } } 

3.4 NioServerSocketChannelSink和NioSocketChannelSink



    public interface NioChannelSink{

        void doRead(); void doSend(); void sendBuffer(ByteBuffer bb); void close(); } 



  • NioServerSocketChannelSink:通過accept()接受客戶端的請求。
  • NioSocketChannelSink:讀取NioChannel中的資料


  • NioServerSocketChannelSink:不支援。
  • NioSocketChannelSink:將緩衝區中資料寫入NioChannel


  • NioServerSocketChannelSink:不支援。
  • NioSocketChannelSink:傳送資料,其實就是將待發送資料加入緩衝佇列中。


  • NioServerSocketChannelSink:關閉Channel。
  • NioSocketChannelSink:同上。



public class NioServerSocketChannel extends NioChannel{ //。。。 class NioServerSocketChannelSink implements NioChannelSink{ public void doRead() { try { ServerSocketChannel ssc = (ServerSocketChannel)sc; handler.channelRead(NioServerSocketChannel.this, new NioSocketChannel(ssc.accept())); if(LOG.isDebugEnabled()){ LOG.debug("Dispatch the SocketChannel to SubReactorPool"); } } catch (Exception e1) { e1.printStackTrace(); } } public void doSend(){ throw new UnsupportedOperationException(); } @Override public void sendBuffer(ByteBuffer bb) { throw new UnsupportedOperationException(); } @Override public void close() { try { if(sc != null){ sc.close(); } } catch (IOException e) { e.printStackTrace(); } } }// end NioChannelSink //。。。 } 


public class NioSocketChannel extends NioChannel{ //。。。 class NioSocketChannelSink implements NioChannelSink{ private static final int MAX_LEN = 1024; ByteBuffer lenBuffer = ByteBuffer.allocate(4); ByteBuffer inputBuffer = lenBuffer; ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64); LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>(); public void close(){ //clear buffer outputDirectBuffer = null; try { if(sc != null){ sc.close(); } } catch (IOException e) { e.printStackTrace(); } } public void doRead() { SocketChannel socketChannel = (SocketChannel)sc; int byteSize; try { byteSize = socketChannel.read(inputBuffer); if(byteSize < 0){ LOG.error("Unable to read additional data"); throw new RuntimeException("Unable to read additional data"); } if(!inputBuffer.hasRemaining()){ if(inputBuffer == lenBuffer){ //read length lenBuffer.flip(); int len = lenBuffer.getInt(); if(len < 0 || len > MAX_LEN){ throw new IllegalArgumentException("Illegal data length, len:" + len); } //prepare for receiving data inputBuffer = ByteBuffer.allocate(len); inputBuffer.clear(); }else{ //read data if(inputBuffer.hasRemaining()){ socketChannel.read(inputBuffer); } if(!inputBuffer.hasRemaining()){ inputBuffer.flip(); fireChannelRead(inputBuffer); //clear lenBuffer and waiting for next reading operation lenBuffer.clear(); inputBuffer = lenBuffer; } } } } catch (Throwable t) { if(LOG.isDebugEnabled()){ LOG.debug("Exception :" + t); } fireExceptionCaught(t); } } public void doSend(){ /** * write data to channel: * step 1: write the length of data(occupy 4 byte) * step 2: data content */ try { if(outputQueue.size() > 0){ ByteBuffer directBuffer = outputDirectBuffer; directBuffer.clear(); for(ByteBuffer buf : outputQueue){ buf.flip(); if(buf.remaining() > directBuffer.remaining()){ //prevent BufferOverflowException buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining()); } //transfers the bytes remaining in buf into directBuffer int p = buf.position(); directBuffer.put(buf); //reset position buf.position(p); if(!directBuffer.hasRemaining()){ break; } } directBuffer.flip(); int sendSize = ((SocketChannel)sc).write(directBuffer); while(!outputQueue.isEmpty()){ ByteBuffer buf = outputQueue.peek(); int left = buf.remaining() - sendSize; if(left > 0){ buf.position(buf.position() + sendSize); break; } sendSize -= buf.remaining(); outputQueue.remove(); } } synchronized(reactor){ if(outputQueue.size() == 0){ //disable write disableWrite(); }else{ //enable write enableWrite(); } } } catch (Throwable t) { fireExceptionCaught(t); } } private ByteBuffer wrapWithHead(ByteBuffer bb){ bb.flip(); lenBuffer.clear(); int len = bb.remaining(); lenBuffer.putInt(len); ByteBuffer resp = ByteBuffer.allocate(len+4); lenBuffer.flip(); resp.put(lenBuffer); resp.put(bb); return resp; } public void sendBuffer(ByteBuffer bb){ try{ synchronized(this){ //wrap ByteBuffer with length header ByteBuffer wrapped = wrapWithHead(bb); outputQueue.add(wrapped); enableWrite(); } }catch(Exception e){ LOG.error("Unexcepted Exception: ", e); } } }// end NioSocketChannelSink //。。。 } 


3.5 ChannelHandler


public interface ChannelHandler {
    void channelActive(NioChannel channel); void channelRead(NioChannel channel, Object msg) throws Exception; void exceptionCaught(NioChannel channel, Throwable t) throws Exception; } 

4. 總結

4.1 軟體設計中的一些注意點


比如在網路程式設計中,每建立一個Socket連線都會消耗一定資源,當回話結束後一定要關閉。此外,必須考慮非正常流程時的情況。比如發生異常,可能執行不到關閉資源的操作。 如ReactorPool的例項化過程:

    public ReactorPool(int nThreads){ //。。 reactors = new Reactor[nThreads]; for(int i = 0; i < nThreads; i++){ boolean succeed = false; try{ reactors[i] = new Reactor(); succeed = true; }catch(Exception e){ throw new IllegalStateException("failed to create a Reactor", e); }finally{ if (!succeed) { for (int j = 0; j < i; j ++) { reactors[j].close(); } } } } } 




  • 這個類是執行緒安全的嗎?
  • 這個方法是在哪個執行緒中執行的?
  • 是否是熱點區域?
  • 是否存在併發修改的可能?
  • 併發修改是否可見?



  1. 該類是否是有狀態的,無狀態的類一定是執行緒安全的。
  2. 如果有狀態,是否可變。如果一個類狀態不可變,那麼肯定也是執行緒安全的。




