資料庫路由中介軟體MyCat - 原始碼篇(9)
此文已由作者張鎬薪授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
3. 連線模組
3.5 後端連線
3.5.1 後端連接獲取與負載均衡
上一節我們講了後端連線的基本建立和響應處理,那麼這些後端連線是什麼時候建立的呢? 首先,MyCat配置檔案中,DataHost標籤中有minIdle這個屬性。代表在MyCat初始化時,會在這個DataHost上初始化維護多少個連線(這些連線可以理解為連線池)。每個前端Client連線會建立Session,而Session會根據命令的不同而建立不同的Handler。每個Handler會從連線池中拿出所需要的連線並使用。在連線池大小不夠時,RW執行緒會非同步驅使新建所需的連線補充連線池,但是連線數最大不能超過配置的maxCon。同時,如之前所述,有定時執行緒檢查並回收空閒後端連線。但池中最小不會小於minCon。 我們可以通過後端連線的工廠方法的呼叫鏈來理解:
st=>start: MyCat接受客戶端連線併為之建立唯一繫結的Session e=>end: 將請求傳送給對應連線,處理完之後歸還連線 op1=>operation: MyCat接受客戶端的請求,計算路由 op2=>operation: 根據請求和路由建立合適的handler,這裡為SingleNodeHandler op3=>operation: 從PhysicalDBNode中獲取後端連線 cond=>condition: 嘗試獲取連線,連線夠用? op4=>operation: 嘗試非同步建立新的連線 op5=>operation: 通過DelegateResponseHandler將連線與之前的Handler,這裡是SingleNodeHandler繫結 st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e
我們先從Session看起,在MyCat中實現類為NonBlockingSession。在前端連線建立時,會建立繫結唯一的Session: ServerConnectionFactory.java:
protected FrontendConnection getConnection(NetworkChannel channel) throws IOException { SystemConfig sys = MycatServer.getInstance().getConfig().getSystem(); ServerConnection c = new ServerConnection(channel); MycatServer.getInstance().getConfig().setSocketParams(c, true); c.setPrivileges(MycatPrivileges.instance()); c.setQueryHandler(new ServerQueryHandler(c)); c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c)); // c.setPrepareHandler(new ServerPrepareHandler(c)); c.setTxIsolation(sys.getTxIsolation()); //建立繫結唯一Session c.setSession2(new NonBlockingSession(c)); return c; }
Session主要處理事務,多節點轉發協調等,由不同的ResponseHandler實現; 這些ResponseHandler我們之後會在對應的模組去細細分析。這裡先跳過。 檢視SingleNodeHanlder的處理方法 SingleNodeHanlder.java:
public void execute() throws Exception { //從這裡開始計算處理時間 startTime=System.currentTimeMillis(); ServerConnection sc = session.getSource(); this.isRunning = true; this.packetId = 0; final BackendConnection conn = session.getTarget(node); //之前是否獲取過Connection並且Connection有效 if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection MycatConfig conf = MycatServer.getInstance().getConfig(); //從config中獲取DataNode PhysicalDBNode dn = conf.getDataNodes().get(node.getName()); //獲取對應的資料庫連線 dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this, node); } }
從PhysicalDBNode中獲取合適的連線:
public void getConnection(String schema,boolean autoCommit, RouteResultsetNode rrs, ResponseHandler handler, Object attachment) throws Exception { checkRequest(schema); //檢查資料庫連線池是否初始化成功,因為有reload命令 if (dbPool.isInitSuccess()) { //根據是否能在讀節點上執行獲取連線,一般是判斷是否為讀請求,並且讀請求不在事務中 if (rrs.canRunnINReadDB(autoCommit)) { //根據負載均衡策略選擇合適的後端連線 dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment, this.database); } else { //直接選擇當前連線池中的的後端連線 dbPool.getSource().getConnection(schema,autoCommit, handler, attachment); } } else { throw new IllegalArgumentException("Invalid DataSource:" + dbPool.getActivedIndex()); } }
PhysicalDBPool類中有負載均衡,切換writeHost,控制write方式等(分別對應balance,writeType等標籤)的實現。首先我們看如果有負載均衡策略(配置了balance)的獲取連線的方式:
public void getRWBanlanceCon(String schema, boolean autocommit, ResponseHandler handler, Object attachment, String database) throws Exception { PhysicalDatasource theNode = null; ArrayList<PhysicalDatasource> okSources = null; switch (banlance) { //所有讀寫節點參與read請求的負載均衡,除了當前活躍的寫節點,balance=1 case BALANCE_ALL_BACK: { //返回所有寫節點和符合條件的讀節點,不包括當前的寫節點 okSources = getAllActiveRWSources(true, false, checkSlaveSynStatus()); if (okSources.isEmpty()) { //如果結果即為空,返回當前寫節點 theNode = this.getSource(); } else { //不為空,隨機選一個 theNode = randomSelect(okSources); } break; } //所有讀寫節點參與read請求的負載均衡,balance=2 case BALANCE_ALL: { //返回所有寫節點和符合條件的讀節點 okSources = getAllActiveRWSources(true, true, checkSlaveSynStatus()); //隨機選一個 theNode = randomSelect(okSources); break; } case BALANCE_ALL_READ: { //返回所有符合條件的讀節點 okSources = getAllActiveRWSources(false, false, checkSlaveSynStatus()); //隨機取一個 theNode = randomSelect(okSources); break; } //不做負載均衡,balance=0或其他不為以上的值 case BALANCE_NONE: default: // return default write data source theNode = this.getSource(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("select read source " + theNode.getName() + " for dataHost:" + this.getHostName()); } theNode.getConnection(schema, autocommit, handler, attachment); }
其中涉及到的方法:
返回符合條件節點集:
private ArrayList<PhysicalDatasource> getAllActiveRWSources( boolean includeWriteNode, boolean includeCurWriteNode, boolean filterWithSlaveThreshold) { int curActive = activedIndex; ArrayList<PhysicalDatasource> okSources = new ArrayList<PhysicalDatasource>(this.allDs.size()); //判斷寫節點 for (int i = 0; i < this.writeSources.length; i++) { PhysicalDatasource theSource = writeSources[i]; //判斷寫節點是否是active,可能reload會置為inactive,可能多個寫節點但是隻有一個是活躍在用的(writeType=0) if (isAlive(theSource)) { //負載均衡策略是否包含寫節點 if (includeWriteNode) { //判斷是否包含當前活躍的寫入節點 if (i == curActive && includeCurWriteNode == false) { // not include cur active source } else if (filterWithSlaveThreshold) { //如果包含從節點同步延遲限制,檢查同步狀態 if (canSelectAsReadNode(theSource)) { okSources.add(theSource); } else { //如果同步狀態不對,則不新增這個寫節點 continue; } } else { okSources.add(theSource); } } //檢查theSource對應的讀節點 if (!readSources.isEmpty()) { // 檢查theSource對應的讀節點(從節點) PhysicalDatasource[] allSlaves = this.readSources.get(i); if (allSlaves != null) { for (PhysicalDatasource slave : allSlaves) { if (isAlive(slave)) { //如果包含從節點同步延遲限制,檢查同步狀態 if (filterWithSlaveThreshold) { if (canSelectAsReadNode(slave)) { //如果同步狀態正確,則把讀節點加入 okSources.add(slave); } else { continue; } } else { okSources.add(slave); } } } } } } else { // TODO : add by zhuam // 如果寫節點不OK, 也要保證臨時的讀服務正常 if (this.dataHostConfig.isTempReadHostAvailable()) { if (!readSources.isEmpty()) { // check all slave nodes PhysicalDatasource[] allSlaves = this.readSources.get(i); if (allSlaves != null) { for (PhysicalDatasource slave : allSlaves) { if (isAlive(slave)) { if (filterWithSlaveThreshold) { if (canSelectAsReadNode(slave)) { okSources.add(slave); } else { continue; } } else { okSources.add(slave); } } } } } } } } return okSources; }
檢查是否判斷主從延遲:
private boolean checkSlaveSynStatus() { return (dataHostConfig.getSlaveThreshold() != -1) && (dataHostConfig.getSwitchType() == DataHostConfig.SYN_STATUS_SWITCH_DS); }
隨機選擇節點:
/** * TODO: modify by zhuam * <p/> * 隨機選擇,按權重設定隨機概率。 * 在一個截面上碰撞的概率高,但呼叫量越大分佈越均勻,而且按概率使用權重後也比較均勻,有利於動態調整提供者權重。 * * @param okSources * @return */ public PhysicalDatasource randomSelect(ArrayList<PhysicalDatasource> okSources) { if (okSources.isEmpty()) { return this.getSource(); } else { int length = okSources.size(); // 總個數 int totalWeight = 0; // 總權重 boolean sameWeight = true; // 權重是否都一樣 for (int i = 0; i < length; i++) { int weight = okSources.get(i).getConfig().getWeight(); totalWeight += weight; // 累計總權重 if (sameWeight && i > 0 && weight != okSources.get(i - 1).getConfig().getWeight()) { // 計算所有權重是否一樣 sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // 如果權重不相同且權重大於0則按總權重數隨機 int offset = random.nextInt(totalWeight); // 並確定隨機值落在哪個片斷上 for (int i = 0; i < length; i++) { offset -= okSources.get(i).getConfig().getWeight(); if (offset < 0) { return okSources.get(i); } } } // 如果權重相同或權重為0則均等隨機 return okSources.get(random.nextInt(length)); //int index = Math.abs(random.nextInt()) % okSources.size(); //return okSources.get(index); } }
根據writeType獲取當前writeHost方法:
public PhysicalDatasource getSource() { switch (writeType) { //writeType=0,返回當前active的writeHost case WRITE_ONLYONE_NODE: { return writeSources[activedIndex]; } //writeType=1,隨機發到一個writeHost case WRITE_RANDOM_NODE: { int index = Math.abs(wnrandom.nextInt()) % writeSources.length; PhysicalDatasource result = writeSources[index]; if (!this.isAlive(result)) { // find all live nodes ArrayList<Integer> alives = new ArrayList<Integer>(writeSources.length - 1); for (int i = 0; i < writeSources.length; i++) { if (i != index) { if (this.isAlive(writeSources[i])) { alives.add(i); } } } if (alives.isEmpty()) { result = writeSources[0]; } else { // random select one index = Math.abs(wnrandom.nextInt()) % alives.size(); result = writeSources[alives.get(index)]; } } if (LOGGER.isDebugEnabled()) { LOGGER.debug("select write source " + result.getName() + " for dataHost:" + this.getHostName()); } return result; } //引數不正確 default: { throw new java.lang.IllegalArgumentException("writeType is " + writeType + " ,so can't return one write datasource "); } } }
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 資料庫路由中介軟體MyCat - 使用篇(1)