1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(6)

資料庫路由中介軟體MyCat - 原始碼篇(6)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

3. 連線模組

3.3 AbstractConnection:

3.3.2 NIOHandler

NIOHandler實際上就是對於業務處理方法的封裝,對於不同的連線有不同的處理方法,也就是不同的NIOHandler

public interface NIOHandler { void handle(byte[] data);
}

它的實現以及子類會在之後的對應的處理模組細講。

3.3.3 NIOSocketWR

實現對於AbstractConnection(實際就是對裡面封裝的channel)進行非同步讀寫,將從channel中讀取到的放到AbstractConnection的readBuffer中,將writeBuffer和寫佇列中的資料寫入到channel中。可以這麼說,AbstractConnection的方法只對它裡面的buffer進行操作,而buffer與channel之間的互動,是通過NIOSocketWR的方法完成的。 下面是它的方法以及對應的說明:


    public void register(Selector selector) throws IOException {        try {
            processKey = channel.register(selector, SelectionKey.OP_READ, con);
        } finally {            if (con.isClosed.get()) {
                clearSelectionKey();
            }
        }
    }    private void clearSelectionKey() {        try {
            SelectionKey key = this.processKey;            if (key != null && key.isValid()) {
                key.attach(null);
                key.cancel();
            }
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("clear selector keys err:" + e);
        }
    }


呼叫關係:這個方法就是之前講的AbstractionConnection與RW執行緒繫結,AbstractionConnection封裝的channel需要在RW執行緒的selector上註冊讀事件以監聽讀事件。


    public void doNextWriteCheck() {        //檢查是否正在寫,看CAS更新writing值是否成功
        if (!writing.compareAndSet(false, true)) {            return;
        }        try {            //利用快取佇列和寫緩衝記錄保證寫的可靠性,返回true則為全部寫入成功
            boolean noMoreData = write0();            //因為只有一個執行緒可以成功CAS更新writing值,所以這裡不用再CAS
            writing.set(false);            //如果全部寫入成功而且寫入佇列為空(有可能在寫入過程中又有新的Bytebuffer加入到佇列),則取消註冊寫事件
            //否則,繼續註冊寫事件
            if (noMoreData && con.writeQueue.isEmpty()) {                if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
                    disableWrite();
                }

            } else {                if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
                    enableWrite(false);
                }
            }

        } catch (IOException e) {            if (AbstractConnection.LOGGER.isDebugEnabled()) {
                AbstractConnection.LOGGER.debug("caught err:", e);
            }
            con.close("err:" + e);
        }

    }    private boolean write0() throws IOException {        int written = 0;
        ByteBuffer buffer = con.writeBuffer;        if (buffer != null) {            //只要寫緩衝記錄中還有資料就不停寫入,但如果寫入位元組為0,證明網路繁忙,則退出
            while (buffer.hasRemaining()) {
                written = channel.write(buffer);                if (written > 0) {
                    con.netOutBytes += written;
                    con.processor.addNetOutBytes(written);
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                } else {                    break;
                }
            }            //如果寫緩衝中還有資料證明網路繁忙,計數並退出,否則清空緩衝
            if (buffer.hasRemaining()) {
                con.writeAttempts++;                return false;
            } else {
                con.writeBuffer = null;
                con.recycle(buffer);
            }
        }        //讀取快取佇列並寫channel
        while ((buffer = con.writeQueue.poll()) != null) {            if (buffer.limit() == 0) {
                con.recycle(buffer);
                con.close("quit send");                return true;
            }
            buffer.flip();            while (buffer.hasRemaining()) {
                written = channel.write(buffer);                if (written > 0) {
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                    con.netOutBytes += written;
                    con.processor.addNetOutBytes(written);
                    con.lastWriteTime = TimeUtil.currentTimeMillis();
                } else {                    break;
                }
            }            //如果寫緩衝中還有資料證明網路繁忙,計數,記錄下這次未寫完的資料到寫緩衝記錄並退出,否則回收緩衝
            if (buffer.hasRemaining()) {
                con.writeBuffer = buffer;
                con.writeAttempts++;                return false;
            } else {
                con.recycle(buffer);
            }
        }        return true;
    }    private void disableWrite() {        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() & OP_NOT_WRITE);
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("can't disable write " + e + " con "
                    + con);
        }

    }    private void enableWrite(boolean wakeup) {        boolean needWakeup = false;        try {
            SelectionKey key = this.processKey;
            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
            needWakeup = true;
        } catch (Exception e) {
            AbstractConnection.LOGGER.warn("can't enable write " + e);

        }        if (needWakeup && wakeup) {
            processKey.selector().wakeup();
        }
    }


這個doNextWriteCheck方法之前也講過,看呼叫關係:第一個呼叫關係沒意義,WriteEventCheckRunner這個類從沒被呼叫過。 第二個呼叫很。。。就是將這個方法簡單封裝,估計是為了好修改,之後會提兩種寫策略對比。 第三個呼叫是主要呼叫,所有往AbstractionConnection中寫入都會呼叫Abstraction.write(ByteBuffer),這個方法先把要寫的放入快取佇列,之後呼叫上面這個doNextWriteCheck方法。 第四個和第五個都是定時檢查任務,為了檢查是否有AbstractionConnection的寫快取沒有寫完的情況


@Override
    public void asynRead() throws IOException {
        ByteBuffer theBuffer = con.readBuffer;        //如果buffer為空,證明被回收或者是第一次讀,新分配一個buffer給AbstractConnection作為readBuffer
        if (theBuffer == null) {
            theBuffer = con.processor.getBufferPool().allocate();
            con.readBuffer = theBuffer;
        }        //從channel中讀取資料,並且儲存到對應AbstractConnection的readBuffer中,readBuffer處於write mode,返回讀取了多少位元組
        int got = channel.read(theBuffer);        //呼叫處理讀取到的資料的方法
        con.onReadData(got);
    }


這個方法之前也講過,非同步將channel中的資料讀取到readBuffer中,之後呼叫對應AbstractConnection的處理方法。 呼叫關係:按理說,應該只有在RW執行緒檢測到讀事件之後,才會呼叫這個非同步讀方法。但是在FrontendConnection的register()方法和BackendAIOConnection的register()方法都呼叫了。這是因為這兩個方法在正常工作情況下為了註冊一個會先主動發一個握手包,另一個會先讀取一個握手包。所以都會執行非同步讀方法。


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 如何玩轉基於風險的測試