數據庫路由中間件MyCat - 源代碼篇(8)
此文已由作者張鎬薪授權網易雲社區發布。
歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。
3. 連接模塊
3.5 後端連接
對於後端連接,我們只關心MySQL的。 從後端連接工廠開始MySQLConnectionFactory.java:
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler, String schema) throws IOException { //DBHost配置 DBHostConfig dsc = pool.getConfig(); //根據是否為NIO返回SocketChannel或者AIO的AsynchronousSocketChannel NetworkChannel channel = openSocketChannel(MycatServer.getInstance() .isAIO()); //新建MySQLConnection MySQLConnection c = new MySQLConnection(channel, pool.isReadNode()); //根據配置初始化MySQLConnection MycatServer.getInstance().getConfig().setSocketParams(c, false); c.setHost(dsc.getIp()); c.setPort(dsc.getPort()); c.setUser(dsc.getUser()); c.setPassword(dsc.getPassword()); c.setSchema(schema); //目前實際連接還未建立,handler為MySQL連接認證MySQLConnectionAuthenticator c.setHandler(new MySQLConnectionAuthenticator(c, handler)); c.setPool(pool); c.setIdleTimeout(pool.getConfig().getIdleTimeout()); //AIO和NIO連接方式建立實際的MySQL連接 if (channel instanceof AsynchronousSocketChannel) { ((AsynchronousSocketChannel) channel).connect( new InetSocketAddress(dsc.getIp(), dsc.getPort()), c, (CompletionHandler) MycatServer.getInstance() .getConnector()); } else { //通過NIOConnector建立連接 ((NIOConnector) MycatServer.getInstance().getConnector()) .postConnect(c); } return c; }
通過NIOConnector建立實際連接的過程與前端連接的建立相似,也是先放在隊列中,之後由NIOConnector去建立連接。
public void postConnect(AbstractConnection c) { connectQueue.offer(c); selector.wakeup(); }public void run() { final Selector tSelector = this.selector; for (;;) { ++connectCount; try { //查看有無連接就緒 tSelector.select(1000L); connect(tSelector); Set<SelectionKey> keys = tSelector.selectedKeys(); try { for (SelectionKey key : keys) { Object att = key.attachment(); if (att != null && key.isValid() && key.isConnectable()) { finishConnect(key, att); } else { key.cancel(); } } } finally { keys.clear(); } } catch (Exception e) { LOGGER.warn(name, e); } } }private void connect(Selector selector) { AbstractConnection c = null; while ((c = connectQueue.poll()) != null) { try { SocketChannel channel = (SocketChannel) c.getChannel(); //註冊OP_CONNECT監聽與後端連接是否真正建立 channel.register(selector, SelectionKey.OP_CONNECT, c); //主動連接 channel.connect(new InetSocketAddress(c.host, c.port)); } catch (Exception e) { c.close(e.toString()); } } }private void finishConnect(SelectionKey key, Object att) { BackendAIOConnection c = (BackendAIOConnection) att; try { if (finishConnect(c, (SocketChannel) c.channel)) { //做原生NIO連接是否完成的判斷和操作 clearSelectionKey(key); c.setId(ID_GENERATOR.getId()); //綁定特定的NIOProcessor以作idle清理 NIOProcessor processor = MycatServer.getInstance() .nextProcessor(); c.setProcessor(processor); //與特定NIOReactor綁定監聽讀寫 NIOReactor reactor = reactorPool.getNextReactor(); reactor.postRegister(c); } } catch (Exception e) { //如有異常,將key清空 clearSelectionKey(key); c.close(e.toString()); c.onConnectFailed(e); } }private boolean finishConnect(AbstractConnection c, SocketChannel channel) throws IOException { if (channel.isConnectionPending()) { channel.finishConnect(); c.setLocalPort(channel.socket().getLocalPort()); return true; } else { return false; } }private void clearSelectionKey(SelectionKey key) { if (key.isValid()) { key.attach(null); key.cancel(); } }
綁定到具體的NIOReactor之後,監聽讀事件。和之前講的前端連接建立過程類似。這次是後端MySQL主動發握手包。這時,讀事件就緒,NIOReactor中的RW線程會調用對應AbstractConnection(這裏是MySQLConnection)的handler的處理方法處理。這裏MySQLConnection中的handler參考工廠方法發現是MySQLConnectionAuthenticator。查看handle方法:
/** * MySQL 4.1版本之前是MySQL323加密,MySQL 4.1和之後的版本都是MySQLSHA1加密,在MySQL5.5的版本之後可以客戶端插件式加密(這個MyCat實現) * @see @http://dev.mysql.com/doc/internals/en/determining-authentication-method.html */@Overridepublic void handle(byte[] data) { try { switch (data[4]) { //如果是OkPacket,檢查是否認證成功 case OkPacket.FIELD_COUNT: HandshakePacket packet = source.getHandshake(); if (packet == null) { //如果為null,證明鏈接第一次建立,處理 processHandShakePacket(data); // 發送認證數據包 source.authenticate(); break; } // 如果packet不為null,處理認證結果 //首先將連接設為已驗證並將handler改為MySQLConnectionHandler source.setHandler(new MySQLConnectionHandler(source)); source.setAuthenticated(true); //判斷是否用了壓縮協議 boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & packet.serverCapabilities); boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ; if(clientCompress&&usingCompress) { source.setSupportCompress(true); } //設置ResponseHandler if (listener != null) { listener.connectionAcquired(source); } break; //如果為ErrorPacket,則認證失敗 case ErrorPacket.FIELD_COUNT: ErrorPacket err = new ErrorPacket(); err.read(data); String errMsg = new String(err.message); LOGGER.warn("can‘t connect to mysql server ,errmsg:"+errMsg+" "+source); //source.close(errMsg); throw new ConnectionException(err.errno, errMsg); //如果是EOFPacket,則為MySQL 4.1版本,是MySQL323加密 case EOFPacket.FIELD_COUNT: auth323(data[3]); break; default: packet = source.getHandshake(); if (packet == null) { processHandShakePacket(data); // 發送認證數據包 source.authenticate(); break; } else { throw new RuntimeException("Unknown Packet!"); } } } catch (RuntimeException e) { if (listener != null) { listener.connectionError(e, source); return; } throw e; } }
在連接建立並認證後,MySQLConnectionHandler來處理這個連接的請求和相應。 MySQL服務端響應客戶端查詢請求的流程如下:可以分為三個階段: (第一階段)客戶端發送查詢請求包COM_QUERY (command query packet),如果有結果集返回,且結果集不為空,則返回FieldCount(列數量)包;如果結果集為空,則返回OKPacket;如果命令有錯,則返回ERRPacket;如果是Load file data命令,則返回LOCAL_INFILE_Request。 (第二階段)如果有結果集返回,則先返回列集合,所有列返回完了之後,會返回EOFPacket;如果過程中出現錯誤,則返回錯誤包。 (第三階段)之後返回行記錄,返回全部行記錄之後,返回EOFPacket。如果有錯誤,回錯誤包。 MyCat實現源代碼如下:
protected void handleData(byte[] data) { switch (resultStatus) { //第一階段 case RESULT_STATUS_INIT: switch (data[4]) { //返回OKPacket case OkPacket.FIELD_COUNT: handleOkPacket(data); break; //返回錯誤包 case ErrorPacket.FIELD_COUNT: handleErrorPacket(data); break; //返回Load Data進一步操作 case RequestFilePacket.FIELD_COUNT: handleRequestPacket(data); break; //返回結果集列數量 default: //記錄列數量並進入第二階段 resultStatus = RESULT_STATUS_HEADER; header = data; fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data, 4)); } break; //第二階段 case RESULT_STATUS_HEADER: switch (data[4]) { //返回錯誤包 case ErrorPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleErrorPacket(data); break; //返回EOF,證明列集合返回完畢,進入第三階段 case EOFPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_FIELD_EOF; handleFieldEofPacket(data); break; //返回的是列集合,記錄 default: fields.add(data); } break; //第三階段 case RESULT_STATUS_FIELD_EOF: switch (data[4]) { //返回錯誤包 case ErrorPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleErrorPacket(data); break; //返回EOF,證明結果集返回完畢,回到第一階段等待下一個請求的響應 case EOFPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleRowEofPacket(data); break; //返回結果集包 default: handleRowPacket(data); } break; default: throw new RuntimeException("unknown status!"); } }
免費體驗雲安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 IOS渠道追蹤方式
數據庫路由中間件MyCat - 源代碼篇(8)