1. 程式人生 > >數據庫路由中間件MyCat - 源代碼篇(8)

數據庫路由中間件MyCat - 源代碼篇(8)

工廠方法 處理方法 [] ostc his poll sin sre .html

此文已由作者張鎬薪授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。

3. 連接模塊

3.5 後端連接

對於後端連接,我們只關心MySQL的。 從後端連接工廠開始MySQLConnectionFactory.java:


public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
            String schema) throws IOException {        //DBHost配置
        DBHostConfig dsc = pool.getConfig();        //根據是否為NIO返回SocketChannel或者AIO的AsynchronousSocketChannel
        NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
                .isAIO());        //新建MySQLConnection
        MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());        //根據配置初始化MySQLConnection
        MycatServer.getInstance().getConfig().setSocketParams(c, false);
        c.setHost(dsc.getIp());
        c.setPort(dsc.getPort());
        c.setUser(dsc.getUser());
        c.setPassword(dsc.getPassword());
        c.setSchema(schema);        //目前實際連接還未建立,handler為MySQL連接認證MySQLConnectionAuthenticator
        c.setHandler(new MySQLConnectionAuthenticator(c, handler));
        c.setPool(pool);
        c.setIdleTimeout(pool.getConfig().getIdleTimeout());        //AIO和NIO連接方式建立實際的MySQL連接
        if (channel instanceof AsynchronousSocketChannel) {
            ((AsynchronousSocketChannel) channel).connect(                    new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
                    (CompletionHandler) MycatServer.getInstance()
                            .getConnector());
        } else {            //通過NIOConnector建立連接
            ((NIOConnector) MycatServer.getInstance().getConnector())
                    .postConnect(c);

        }        return c;
    }


通過NIOConnector建立實際連接的過程與前端連接的建立相似,也是先放在隊列中,之後由NIOConnector去建立連接。


public void postConnect(AbstractConnection c) {
    connectQueue.offer(c);
    selector.wakeup();
}public void run() {    final Selector tSelector = this.selector;    for (;;) {
        ++connectCount;        try {            //查看有無連接就緒
            tSelector.select(1000L);
            connect(tSelector);
            Set<SelectionKey> keys = tSelector.selectedKeys();            try {                for (SelectionKey key : keys) {
                    Object att = key.attachment();                    if (att != null && key.isValid() && key.isConnectable()) {
                        finishConnect(key, att);
                    } else {
                        key.cancel();
                    }
                }
            } finally {
                keys.clear();
            }
        } catch (Exception e) {
            LOGGER.warn(name, e);
        }
    }
}private void connect(Selector selector) {
    AbstractConnection c = null;    while ((c = connectQueue.poll()) != null) {        try {
            SocketChannel channel = (SocketChannel) c.getChannel();            //註冊OP_CONNECT監聽與後端連接是否真正建立
            channel.register(selector, SelectionKey.OP_CONNECT, c);               //主動連接
            channel.connect(new InetSocketAddress(c.host, c.port));
        } catch (Exception e) {
            c.close(e.toString());
        }
    }
}private void finishConnect(SelectionKey key, Object att) {
    BackendAIOConnection c = (BackendAIOConnection) att;    try {        if (finishConnect(c, (SocketChannel) c.channel)) { //做原生NIO連接是否完成的判斷和操作
            clearSelectionKey(key);
            c.setId(ID_GENERATOR.getId());               //綁定特定的NIOProcessor以作idle清理
            NIOProcessor processor = MycatServer.getInstance()
                    .nextProcessor();
            c.setProcessor(processor);               //與特定NIOReactor綁定監聽讀寫
            NIOReactor reactor = reactorPool.getNextReactor();
            reactor.postRegister(c);
        }
    } catch (Exception e) {        //如有異常,將key清空
           clearSelectionKey(key);
           c.close(e.toString());
        c.onConnectFailed(e);
    }
}private boolean finishConnect(AbstractConnection c, SocketChannel channel)
        throws IOException {    if (channel.isConnectionPending()) {
        channel.finishConnect();
        c.setLocalPort(channel.socket().getLocalPort());        return true;
    } else {        return false;
    }
}private void clearSelectionKey(SelectionKey key) {    if (key.isValid()) {
        key.attach(null);
        key.cancel();
    }
}

綁定到具體的NIOReactor之後,監聽讀事件。和之前講的前端連接建立過程類似。這次是後端MySQL主動發握手包。這時,讀事件就緒,NIOReactor中的RW線程會調用對應AbstractConnection(這裏是MySQLConnection)的handler的處理方法處理。這裏MySQLConnection中的handler參考工廠方法發現是MySQLConnectionAuthenticator。查看handle方法:


/**
 * MySQL 4.1版本之前是MySQL323加密,MySQL 4.1和之後的版本都是MySQLSHA1加密,在MySQL5.5的版本之後可以客戶端插件式加密(這個MyCat實現)
 * @see @http://dev.mysql.com/doc/internals/en/determining-authentication-method.html
 */@Overridepublic void handle(byte[] data) {    try {        switch (data[4]) {        //如果是OkPacket,檢查是否認證成功
        case OkPacket.FIELD_COUNT:
            HandshakePacket packet = source.getHandshake();            if (packet == null) {                //如果為null,證明鏈接第一次建立,處理
                processHandShakePacket(data);                // 發送認證數據包
                source.authenticate();                break;
            }            // 如果packet不為null,處理認證結果
            //首先將連接設為已驗證並將handler改為MySQLConnectionHandler
            source.setHandler(new MySQLConnectionHandler(source));
            source.setAuthenticated(true);            //判斷是否用了壓縮協議
            boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & packet.serverCapabilities);            boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ;            if(clientCompress&&usingCompress)
            {
                source.setSupportCompress(true);
            }            //設置ResponseHandler
            if (listener != null) {
                listener.connectionAcquired(source);
            }            break;        //如果為ErrorPacket,則認證失敗
        case ErrorPacket.FIELD_COUNT:
            ErrorPacket err = new ErrorPacket();
            err.read(data);
            String errMsg = new String(err.message);
            LOGGER.warn("can‘t connect to mysql server ,errmsg:"+errMsg+" "+source);            //source.close(errMsg);
            throw new ConnectionException(err.errno, errMsg);        //如果是EOFPacket,則為MySQL 4.1版本,是MySQL323加密
        case EOFPacket.FIELD_COUNT:
            auth323(data[3]);            break;        default:
            packet = source.getHandshake();            if (packet == null) {
                processHandShakePacket(data);                // 發送認證數據包
                source.authenticate();                break;
            } else {                throw new RuntimeException("Unknown Packet!");
            }

        }

    } catch (RuntimeException e) {        if (listener != null) {
            listener.connectionError(e, source);            return;
        }        throw e;
    }
}

在連接建立並認證後,MySQLConnectionHandler來處理這個連接的請求和相應。 MySQL服務端響應客戶端查詢請求的流程如下:技術分享圖片可以分為三個階段: (第一階段)客戶端發送查詢請求包COM_QUERY (command query packet),如果有結果集返回,且結果集不為空,則返回FieldCount(列數量)包;如果結果集為空,則返回OKPacket;如果命令有錯,則返回ERRPacket;如果是Load file data命令,則返回LOCAL_INFILE_Request。 (第二階段)如果有結果集返回,則先返回列集合,所有列返回完了之後,會返回EOFPacket;如果過程中出現錯誤,則返回錯誤包。 (第三階段)之後返回行記錄,返回全部行記錄之後,返回EOFPacket。如果有錯誤,回錯誤包。 MyCat實現源代碼如下:


protected void handleData(byte[] data) {    switch (resultStatus) {        //第一階段
        case RESULT_STATUS_INIT:
            switch (data[4]) {                //返回OKPacket
                case OkPacket.FIELD_COUNT:
                    handleOkPacket(data);                    break;                //返回錯誤包
                case ErrorPacket.FIELD_COUNT:
                    handleErrorPacket(data);                    break;                //返回Load Data進一步操作
                case RequestFilePacket.FIELD_COUNT:
                    handleRequestPacket(data);                    break;                //返回結果集列數量                default:
                    //記錄列數量並進入第二階段
                    resultStatus = RESULT_STATUS_HEADER;
                    header = data;
                    fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,                            4));
            }            break;        //第二階段
        case RESULT_STATUS_HEADER:
            switch (data[4]) {                //返回錯誤包
                case ErrorPacket.FIELD_COUNT:
                    resultStatus = RESULT_STATUS_INIT;
                    handleErrorPacket(data);                    break;                //返回EOF,證明列集合返回完畢,進入第三階段
                case EOFPacket.FIELD_COUNT:
                    resultStatus = RESULT_STATUS_FIELD_EOF;
                    handleFieldEofPacket(data);                    break;                //返回的是列集合,記錄                default:
                    fields.add(data);
            }            break;        //第三階段
        case RESULT_STATUS_FIELD_EOF:
            switch (data[4]) {                //返回錯誤包
                case ErrorPacket.FIELD_COUNT:
                    resultStatus = RESULT_STATUS_INIT;
                    handleErrorPacket(data);                    break;                //返回EOF,證明結果集返回完畢,回到第一階段等待下一個請求的響應
                case EOFPacket.FIELD_COUNT:
                    resultStatus = RESULT_STATUS_INIT;
                    handleRowEofPacket(data);                    break;                //返回結果集包                default:
                    handleRowPacket(data);
            }            break;        default:
            throw new RuntimeException("unknown status!");
    }
}


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊。




相關文章:
【推薦】 IOS渠道追蹤方式

數據庫路由中間件MyCat - 源代碼篇(8)