1. 程式人生 > >資料庫路由中介軟體MyCat - 原始碼篇(9)

資料庫路由中介軟體MyCat - 原始碼篇(9)

此文已由作者張鎬薪授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。

3. 連線模組

3.5 後端連線

3.5.1 後端連接獲取與負載均衡

上一節我們講了後端連線的基本建立和響應處理,那麼這些後端連線是什麼時候建立的呢? 首先,MyCat配置檔案中,DataHost標籤中有minIdle這個屬性。代表在MyCat初始化時,會在這個DataHost上初始化維護多少個連線(這些連線可以理解為連線池)。每個前端Client連線會建立Session,而Session會根據命令的不同而建立不同的Handler。每個Handler會從連線池中拿出所需要的連線並使用。在連線池大小不夠時,RW執行緒會非同步驅使新建所需的連線補充連線池,但是連線數最大不能超過配置的maxCon。同時,如之前所述,有定時執行緒檢查並回收空閒後端連線。但池中最小不會小於minCon。 我們可以通過後端連線的工廠方法的呼叫鏈來理解: 這裡寫圖片描述

看這個呼叫鏈,我們簡述下大概的流程。

st=>start: MyCat接受客戶端連線併為之建立唯一繫結的Session
e=>end: 將請求傳送給對應連線,處理完之後歸還連線
op1=>operation: MyCat接受客戶端的請求,計算路由
op2=>operation: 根據請求和路由建立合適的handler,這裡為SingleNodeHandler
op3=>operation: 從PhysicalDBNode中獲取後端連線
cond=>condition: 嘗試獲取連線,連線夠用?
op4=>operation: 嘗試非同步建立新的連線
op5=>operation: 通過DelegateResponseHandler將連線與之前的Handler,這裡是SingleNodeHandler繫結
st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e

我們先從Session看起,在MyCat中實現類為NonBlockingSession。在前端連線建立時,會建立繫結唯一的Session: ServerConnectionFactory.java:

protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
    SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
    ServerConnection c = new ServerConnection(channel);
    MycatServer.getInstance().getConfig().setSocketParams(c, true);
    c.setPrivileges(MycatPrivileges.instance());
    c.setQueryHandler(new ServerQueryHandler(c));
    c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));    // c.setPrepareHandler(new ServerPrepareHandler(c));
    c.setTxIsolation(sys.getTxIsolation());    //建立繫結唯一Session
    c.setSession2(new NonBlockingSession(c));    return c;
}

Session主要處理事務,多節點轉發協調等,由不同的ResponseHandler實現; 這裡寫圖片描述 這些ResponseHandler我們之後會在對應的模組去細細分析。這裡先跳過。 檢視SingleNodeHanlder的處理方法 SingleNodeHanlder.java:

public void execute() throws Exception {    //從這裡開始計算處理時間
    startTime=System.currentTimeMillis();
    ServerConnection sc = session.getSource();    this.isRunning = true;    this.packetId = 0;    final BackendConnection conn = session.getTarget(node);    //之前是否獲取過Connection並且Connection有效
    if (session.tryExistsCon(conn, node)) {
        _execute(conn);
    } else {        // create new connection
        MycatConfig conf = MycatServer.getInstance().getConfig();        //從config中獲取DataNode
        PhysicalDBNode dn = conf.getDataNodes().get(node.getName());        //獲取對應的資料庫連線
        dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                   node);
    }

}

從PhysicalDBNode中獲取合適的連線:

public void getConnection(String schema,boolean autoCommit, RouteResultsetNode rrs,
        ResponseHandler handler, Object attachment) throws Exception {
    checkRequest(schema);    //檢查資料庫連線池是否初始化成功,因為有reload命令
    if (dbPool.isInitSuccess()) {        //根據是否能在讀節點上執行獲取連線,一般是判斷是否為讀請求,並且讀請求不在事務中
        if (rrs.canRunnINReadDB(autoCommit)) {            //根據負載均衡策略選擇合適的後端連線
            dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment,                    this.database);
        } else {            //直接選擇當前連線池中的的後端連線
            dbPool.getSource().getConnection(schema,autoCommit, handler, attachment);
        }

    } else {        throw new IllegalArgumentException("Invalid DataSource:"
                + dbPool.getActivedIndex());
    }
}

PhysicalDBPool類中有負載均衡,切換writeHost,控制write方式等(分別對應balance,writeType等標籤)的實現。首先我們看如果有負載均衡策略(配置了balance)的獲取連線的方式:

public void getRWBanlanceCon(String schema, boolean autocommit,
                             ResponseHandler handler, Object attachment, String database) throws Exception {

    PhysicalDatasource theNode = null;
    ArrayList<PhysicalDatasource> okSources = null;    switch (banlance) {        //所有讀寫節點參與read請求的負載均衡,除了當前活躍的寫節點,balance=1
        case BALANCE_ALL_BACK: {            //返回所有寫節點和符合條件的讀節點,不包括當前的寫節點
            okSources = getAllActiveRWSources(true, false, checkSlaveSynStatus());            if (okSources.isEmpty()) {                //如果結果即為空,返回當前寫節點
                theNode = this.getSource();
            } else {                //不為空,隨機選一個
                theNode = randomSelect(okSources);
            }            break;
        }        //所有讀寫節點參與read請求的負載均衡,balance=2
        case BALANCE_ALL: {            //返回所有寫節點和符合條件的讀節點
            okSources = getAllActiveRWSources(true, true, checkSlaveSynStatus());            //隨機選一個
            theNode = randomSelect(okSources);            break;
        }        case BALANCE_ALL_READ: {            //返回所有符合條件的讀節點
            okSources = getAllActiveRWSources(false, false, checkSlaveSynStatus());            //隨機取一個
            theNode = randomSelect(okSources);            break;
        }        //不做負載均衡,balance=0或其他不為以上的值
        case BALANCE_NONE:        default:
            // return default write data source
            theNode = this.getSource();
    }    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("select read source " + theNode.getName() + " for dataHost:" + this.getHostName());
    }
    theNode.getConnection(schema, autocommit, handler, attachment);
}

其中涉及到的方法:

  1. 返回符合條件節點集:

private ArrayList<PhysicalDatasource> getAllActiveRWSources(            boolean includeWriteNode, boolean includeCurWriteNode, boolean filterWithSlaveThreshold) {        int curActive = activedIndex;
        ArrayList<PhysicalDatasource> okSources = new ArrayList<PhysicalDatasource>(this.allDs.size());        //判斷寫節點
        for (int i = 0; i < this.writeSources.length; i++) {
            PhysicalDatasource theSource = writeSources[i];            //判斷寫節點是否是active,可能reload會置為inactive,可能多個寫節點但是隻有一個是活躍在用的(writeType=0)
            if (isAlive(theSource)) {                //負載均衡策略是否包含寫節點
                if (includeWriteNode) {                    //判斷是否包含當前活躍的寫入節點
                    if (i == curActive && includeCurWriteNode == false) {                        // not include cur active source
                    } else if (filterWithSlaveThreshold) {                        //如果包含從節點同步延遲限制,檢查同步狀態
                        if (canSelectAsReadNode(theSource)) {
                            okSources.add(theSource);
                        } else {                            //如果同步狀態不對,則不新增這個寫節點
                            continue;
                        }

                    } else {
                        okSources.add(theSource);
                    }
                }                //檢查theSource對應的讀節點
                if (!readSources.isEmpty()) {                    // 檢查theSource對應的讀節點(從節點)
                    PhysicalDatasource[] allSlaves = this.readSources.get(i);                    if (allSlaves != null) {                        for (PhysicalDatasource slave : allSlaves) {                            if (isAlive(slave)) {                                //如果包含從節點同步延遲限制,檢查同步狀態
                                if (filterWithSlaveThreshold) {                                    if (canSelectAsReadNode(slave)) {                                        //如果同步狀態正確,則把讀節點加入
                                        okSources.add(slave);
                                    } else {                                        continue;
                                    }

                                } else {
                                    okSources.add(slave);
                                }
                            }
                        }
                    }
                }

            } else {                // TODO : add by zhuam
                // 如果寫節點不OK, 也要保證臨時的讀服務正常
                if (this.dataHostConfig.isTempReadHostAvailable()) {                    if (!readSources.isEmpty()) {                        // check all slave nodes
                        PhysicalDatasource[] allSlaves = this.readSources.get(i);                        if (allSlaves != null) {                            for (PhysicalDatasource slave : allSlaves) {                                if (isAlive(slave)) {                                    if (filterWithSlaveThreshold) {                                        if (canSelectAsReadNode(slave)) {
                                            okSources.add(slave);
                                        } else {                                            continue;
                                        }

                                    } else {
                                        okSources.add(slave);
                                    }
                                }
                            }
                        }
                    }
                }
            }

        }        return okSources;
    }
  1. 檢查是否判斷主從延遲:

private boolean checkSlaveSynStatus() {        return (dataHostConfig.getSlaveThreshold() != -1)
                && (dataHostConfig.getSwitchType() == DataHostConfig.SYN_STATUS_SWITCH_DS);
    }
  1. 隨機選擇節點:

/**
     * TODO: modify by zhuam
     * <p/>
     * 隨機選擇,按權重設定隨機概率。
     * 在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。
     *
     * @param okSources
     * @return
     */
    public PhysicalDatasource randomSelect(ArrayList<PhysicalDatasource> okSources) {        if (okSources.isEmpty()) {            return this.getSource();

        } else {            int length = okSources.size();    // 總個數
            int totalWeight = 0;            // 總權重
            boolean sameWeight = true;        // 權重是否都一樣
            for (int i = 0; i < length; i++) {                int weight = okSources.get(i).getConfig().getWeight();
                totalWeight += weight;        // 累計總權重
                if (sameWeight && i > 0
                        && weight != okSources.get(i - 1).getConfig().getWeight()) {      // 計算所有權重是否一樣
                    sameWeight = false;
                }
            }            if (totalWeight > 0 && !sameWeight) {                // 如果權重不相同且權重大於0則按總權重數隨機
                int offset = random.nextInt(totalWeight);                // 並確定隨機值落在哪個片斷上
                for (int i = 0; i < length; i++) {
                    offset -= okSources.get(i).getConfig().getWeight();                    if (offset < 0) {                        return okSources.get(i);
                    }
                }
            }            // 如果權重相同或權重為0則均等隨機
            return okSources.get(random.nextInt(length));            //int index = Math.abs(random.nextInt()) % okSources.size();
            //return okSources.get(index);
        }
    }
  1. 根據writeType獲取當前writeHost方法:

public PhysicalDatasource getSource() {        switch (writeType) {            //writeType=0,返回當前active的writeHost
            case WRITE_ONLYONE_NODE: {                return writeSources[activedIndex];
            }            //writeType=1,隨機發到一個writeHost
            case WRITE_RANDOM_NODE: {                int index = Math.abs(wnrandom.nextInt()) % writeSources.length;
                PhysicalDatasource result = writeSources[index];                if (!this.isAlive(result)) {                    // find all live nodes
                    ArrayList<Integer> alives = new ArrayList<Integer>(writeSources.length - 1);                    for (int i = 0; i < writeSources.length; i++) {                        if (i != index) {                            if (this.isAlive(writeSources[i])) {
                                alives.add(i);
                            }
                        }
                    }                    if (alives.isEmpty()) {
                        result = writeSources[0];
                    } else {                        // random select one
                        index = Math.abs(wnrandom.nextInt()) % alives.size();
                        result = writeSources[alives.get(index)];

                    }
                }                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("select write source " + result.getName()
                            + " for dataHost:" + this.getHostName());
                }                return result;
            }            //引數不正確
            default: {                throw new java.lang.IllegalArgumentException("writeType is "
                        + writeType + " ,so can't return one write datasource ");
            }
        }

    }


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選




相關文章:
【推薦】 資料庫路由中介軟體MyCat - 使用篇(1)