1. 程式人生 > 實用技巧 >Linkis JDBC模組設計介紹

Linkis JDBC模組設計介紹

目錄

一、背景介紹二、使用介紹(1)引入依賴模組(2)建立測試類三、JDBC模組設計方案(1)驅動類UJESSQLDriver(2)JDBC聯結器UJESSQLConnection(3)執行物件UJESSQLStatement/UJESSQLPreStatement(4)結果集UJESSQLResultSet(5)錯誤碼方案四、實現方案總結五、參考文獻
相關文章分享:Linkis JDBC是如何適配Tableau的?

01

背景介紹


Linkis作為大資料平臺中介軟體,連線了底層的計算儲存和上層的開發應用,統一了任務的排程和執行,在JDBC模組開發出來之前,向Linkis提交SQL執行任務到Spark、Hive執行支援websocket和restful的方式。為了多樣化與Linkis的互動方式,便捷使用者開發流程,輕量化客戶端的任務提交過程,JDBC的支援無疑是非常值得考慮的。
JDBC(Java Data Base Connectivity, java資料庫連線)是一種用於執行SQL語句的Java API,可以為多種關係資料庫提供統一訪問,它由一組用Java語言編寫的類和介面組成。JDBC提供了一種基準,據此可以構建更高階的工具和介面,使資料庫開發人員能夠編寫資料庫應用程式。JDBC提供的主要功能是:1、同一個資料庫建立連線;2、向資料庫傳送SQL語句;3、處理資料庫返回的結果。
在Linkis中,JDBC模組屬於Linkis-UJES下面的一個子模組,UJES即Unified Job Execution Service(統一作業執行服務),UJES是Linkis最初版本的雛形,提供了基礎的任務提交和結果集查詢的對外服務,查詢時JDBC向Linkis-UJES客戶端提交SQL執行,返回得到結果集,使用者在只需要像使用mysql的JDBC那樣操作,大大降低了學習成本,而實現這些功能僅僅需要使用者引入一個Linkis JDBC的JAR包即可。
需要區分的是,Linkis還提供JDBC引擎,例如在DataSphereStudio中可以建立JDBC指令碼通過JDBC引擎提交任務到Linkis,但該引擎僅模擬實現了JDBC的部分方法,並沒有規範性的實現sun提供的JDBC 4.0的完整介面方案,無法向外提供規範的SDK服務。而本文所指的Linkis JDBC模組是實現了一套介面的完整方案。

02

使用介紹

(1)引入依賴模組

第一種方式在pom裡面依賴JDBC模組:

    
    
     
     <dependency>
     
        <groupId>com.webank.wedatasphere.linkis</groupId>
     
        <artifactId>linkis-ujes-jdbc</artifactId>
     
        <version>0.9.1</version>
     
      </dependency>


  
  
   
   
  
  

注意如果引入不到該jar包,需要在ujes/jdbc目錄裡面執行mvn install -Dmaven.test.skip=true進行本地安裝
第二種方式通過打包和編譯:
Step1:在Linkis專案中進入到ujes/jdbc目錄然後在終端輸入指令進行打包mvn assembly:assembly -Dmaven.test.skip=true 該打包指令會跳過單元測試的執行和測試程式碼的編譯,並將JDBC模組需要的依賴一併打包進Jar包之中。Step2:打包完成後在JDBC的target目錄下會生成兩個Jar包,Jar包名稱中包含dependencies字樣的那個就是我們需要的Jar包

(2)建立測試類

建立Java的測試類UJESClientImplTestJ,具體介面含義可以見註釋:


   
   
 
    
    
public static void main(String[] args) throws SQLException, ClassNotFoundException {    //1. 載入驅動類:    Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver");    //2. 獲得連線:jdbc:linkis://gatewayIP:gatewayPort   帳號和密碼對應前端的帳號密碼    Connection connection =  DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password");    //3. 建立statement 和執行查詢    Statement st= connection.createStatement();    ResultSet rs=st.executeQuery("show tables");    //4.處理資料庫的返回結果(使用ResultSet類)    while (rs.next()) {        ResultSetMetaData metaData = rs.getMetaData();        for (int i = 1; i <= metaData.getColumnCount(); i++) {            System.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + "    ");        }        System.out.println();    }    //關閉資源    rs.close();    st.close();    connection.close();}

   
   
 
    
    


   
   
 
    
    2-1 Linkis JDBC
 
    
    任務執行結果

   
   

   
   
 
    
    

03

模組設計方案


Linkis JDBC模組設計的初衷是為了方便使用者通過JDBC的方式,便捷的提交SQL任務到Linkis中執行,是客戶端輕量化追求的一種體現,該模組的類大多以UJESSQL開頭,表示JDBC模組屬於linkis的ujes(Unified Job Execution Service,統一任務執行服務)模組的一部分。
Linkis的JDBC模組包含了五個關鍵的實現類:

  • UJESSQLDriver

  • UJESSQLConnection

  • UJESSQLStatement

  • UJESSQLPreStatement

  • UJESSQLResultSet

以及許多額外的輔助類,例如資料庫元資料UJESSQLDatabaseMetaData,任務執行返回的結果集元資料UJESSQLResultMetaData等。
當UJESSQLDriver通過反射機制註冊到DriverManager後,通過DM可以拿到UJESSQLConnection,接著便可以正常進行SQL任務提交和獲取結果集,下面是結果集獲取時的方法呼叫時序圖:

下面將逐一介紹JDBC關鍵類在Linkis中的實現方案。
(1)驅動類UJESSQLDriver在 JDBC的層次上,sun主要定義了一個介面Driver和兩個類:DirverManager和DriverInfo,每個JDBC驅動程式必須實現 Driver介面,例如MySql的Connector/J驅動中,叫做com.mysql.jdbc.Driver,在Linkis的JDBC中的驅動實現類為UJESSQLDriver。使用時通過Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver")的方法顯示地讓JVM嘗試載入類,並相應的呼叫靜態程式碼塊完成驅動類的註冊。UJESSQLDriver的主要程式碼如下:

static {    try {        DriverManager.registerDriver(new UJESSQLDriver());    } catch (SQLException e) {        Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class);        logger.info("Load driver failed",e);    }}

通過呼叫DriverManager的註冊方法registerDriver將該驅動類註冊到DriverManager中,當用戶呼叫DriverManager的getConnection時,DriverManager會檢索所有已經註冊的驅動類,並根據驅動類的類名和請求URL中的類名進行對比,尋找出對應的驅動類。

(2)JDBC聯結器UJESSQLConnection

Linkis JDBC中聯結器為UJESSQLConnection,實現了java.sql.connection介面。註冊驅動之後,可以通過傳入指定的資料庫連線路徑,使用者名稱和密碼便可獲取資料庫連線物件。

conn = (UJESSQLConnection) DriverManager.getConnection("jdbc:linkis://hostname:port","username","password")

DriverManager 的getConnection方法將傳入的引數進行處理和轉換,呼叫Driver的connect方法,再將引數傳入UJESSQLConnection的構造器中初始化,返回給使用者。下面是UJESSQLDriver實現的的connect方法:

override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) {  val props = if(info != null) info else newProperties  props.putAll(parseURL(url))  val ujesClient =UJESClientFactory.getUJESClient(props)  new UJESSQLConnection(ujesClient, props)} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
private def parseURL(url: String): Properties= { val props = new Properties //add an entry to get url props.setProperty("URL", url) url match { case URL_REGEX(host, port, db, params)=> if(StringUtils.isNotBlank(host))props.setProperty(HOST, host) if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1)) if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1)) if(StringUtils.isNotBlank(params)&& params.length > 1) { val _params = params.substring(1) val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter { case Array(USER, value) => props.setProperty(USER, value) false case Array(PASSWORD, value) => props.setProperty(PASSWORD,value) false case Array(key, _) => if(StringUtils.isBlank(key)) { throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } else true case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url) } props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT)) } case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url) } props}


在構造好連線引數之後,會呼叫UJESClientFactory.getUJESClient(prop)方法建立一個新的ujesclient的linkis訪問客戶端,用於提交和查詢linkis任務。

def getUJESClient(props: Properties):UJESClient = {  val host = props.getProperty(HOST)  val port = props.getProperty(PORT)  val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + host  if(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl)  else serverUrl.intern synchronized {    if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl)    val ujesClient =createUJESClient(serverUrl, props)    ujesClients.put(serverUrl, ujesClient)    ujesClient  }}

將建立好的ujesclient和資料庫引數props傳入UJESSQLConnection的構造器,最終得到一個完整的UJESSQLConnection物件。

(3)執行物件UJESSQLStatement/UJESSQLPreStatement

執行物件在整個JDBC連線和使用的生命週期中,屬於請求儲存者和執行者的身份,每個JDBC聯結器在生成之後,可以通過呼叫聯結器Connection的createStatement方法獲取執行物件,類似地,也可以通過Connection的prepareStatement方法獲取預執行物件。

//獲取執行物件UJESSQLStatementstatement=(UJESSQLStatementCon)conn.createStatement;//獲取預執行物件UJESSQLPrepareStatementpreStatement=(UJESSQLPrepareStatement)conn.prePareStatement;

UJESSQLStatement中最為重要的方法execute作為提交SQL任務執行的入口,任務提交的執行主流程如下:Step1:呼叫hook修改sql。Step2:生成用於提交linkis任務的action。Step3:利用ujes客戶端提交job到linkis執行。Step4:檢測job的狀態翻轉。Step5:獲取結果集ResultSet。
UJESSQLStatement中提交任務執行的Execute方法的程式碼:

override defexecute(sql: String): Boolean = throwWhenClosed {  var parsedSQL = sql  //預執行hook,轉換不支援的sql語句 JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{    preExecution =>      parsedSQL = preExecution.callPreExecutionHook(parsedSQL)  }  //獲取linkis的job執行器,建立用於執行的action任務 val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL)   .setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user) if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap))  //提交SQL任務到ujes客戶端執行  jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build())  queryEnd = false  //job狀態檢測  var status =ujesSQLConnection.ujesClient.status(jobExecuteResult) val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Inf  if(!status.isCompleted)Utils.tryThrow{    Utils.waitUntil(() =>{      status =ujesSQLConnection.ujesClient.status(jobExecuteResult)      status.isCompleted ||closed    }, atMost, 100, 10000)  } {    case t: TimeoutException=>      if(queryTimeout >0) clearQuery()      newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)    case t => t  }  if(!closed) {    var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)    if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc)    val jobInfoStatus =jobInfo.getJobStatus   if(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{      Utils.waitUntil(()=> {        jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)        val state =jobInfo.getJobStatus match{          case"Failed" | "Cancelled" | "Timeout" |"Succeed" => true          case _ => false        }        state || closed      }, atMost, 100, 10000)    } {      case t:TimeoutException =>        if(queryTimeout >0) clearQuery()        newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)      case t => t    }    //獲取結果集    val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient)    queryEnd = true    if(resultSetList !=null) {      resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize)      true    } else false  } else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")}

同樣的UJESPrepareStatement中的excute方法繼承自Statement,原理一致。
UJESPrepareStatement與UJESStatement的不同之處在於,statement每次執行sql語句,相關資料庫都要執行sql語句的編譯,preparedstatement是預編譯的, 且支援批處理。
(4)結果集UJESSQLResultSet當用戶提交完SQL任務到Linkis後,會檢測使用者的job是否已經完成,完成時呼叫getResultSet的方法獲取結果集UJESSQLResultSet。
在UJESSQLResultSet初次載入的時候,java虛擬機器會呼叫初始化的init()方法,該方法會執行三個初始化的步驟,用於構建結果集:Step1:通過resultSetResultInit方法設定獲取結果集相關的引數,如當前使用者和結果集路徑,然後通過ujesClient拿到結果集。Step2:通過metaDataInit方法獲取結果集的元資料。Step3:通過resultSetInit方法獲取結果集的內容。

private def init(): Unit = {  resultSetResultInit()  metaDataInit()  resultSetInit()}private def resultSetResultInit(): Unit = {  if (path == null) path =getResultSetPath(resultSetList)  val user =connection.getProps.getProperty("user")  if(StringUtils.isNotBlank(path)){    val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build()    resultSetResult =connection.ujesClient.resultSet(resultAction)  }}private def metaDataInit(): Unit = {  if ( null ==resultSetResult ){    return  }  metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]]  for(cursor <- 1 tometaData.size()){    val col =metaData.get(cursor - 1)   resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName"))   resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType"))   resultSetMetaData.setCommentPropreties(cursor,col.get("comment"))  }}private def resultSetInit(): Unit = {  if ( null ==resultSetResult ){    return  }  resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]}


UJESSQLResultSet中的next()方法將currentRowCursor作為移動遊標。每次從結果集中讀取資料後都會相應地更新遊標的位置。如果next方法返回true,則可以呼叫getXXX()方法獲取相關欄位資料,反之則說明當前遊標並未指向一條有效記錄,讀取過程直接結束。

override def next(): Boolean = {  if(metaData == null)init()  currentRowCursor += 1  if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) false  else{   updateCurrentRow(currentRowCursor)    true  }}


當next() 方法返回true時,會相應地呼叫getXXX()方法讀取資料。以getString()方法為例:

override def getString(columnIndex: Int): String = {  val any  = getColumnValue(columnIndex)  if(wasNull()) {    throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null")  }else{    any.asInstanceOf[String]  }}


該方法呼叫getColumnValue讀取資料,並將其轉化為String型別的值返回。

(5)錯誤碼方案

為了便於使用者利用Linkis JDBC提交SQL執行,減少錯誤排查時間,我們在Linkis JDBC中對常見的錯誤生成了錯誤碼,儘量覆蓋整個JDBC提交和執行過程。錯誤碼編號範圍初步設定在80000~80100,常見的錯誤型別包括:引數型別錯誤、方法暫不支援、操作邏輯出錯以及返回型別錯誤等。

public enum UJESSQLErrorCode { BAD_URL(80000,"badurl"),  NOSUPPORT_DRIVER(80001,"this method not supported in driver"), NOSUPPORT_CONNECTION(80002,"thismethodnotsupportedinconnection"),  NOSUPPORT_STATEMENT(80003,"this method not supported instatement"),  CONNECTION_CLOSED(80004,"Connection is closed!"),  STATEMENT_CLOSED(80005,"statement is closed!"), SCHEMA_EMPTY(80006,"schemaisempty!"),  SCHEMA_FAILED(80007,"Get schema failed!"),  QUERY_TIMEOUT(80008,"query has been timeout!"),  FILETYPE_ERROR(80009,"file type error"),  METADATATYPE_ERROR(80010,"metadata type error"), NOSUPPORT_METADATA(80011,"thismethodnotsupportedinDatabaseMetaData"),  NOPERMITION(80012,"This user has no permission to read thisfile!"), PARAMS_NOT_FOUND(80013,"Parameternotfound"),  ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"), RESULTSET_ROWERROR(80015,"rowmessageerror"),  NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"),  RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"),  PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"),  METADATA_EMPTY(80019,"data is empty")  ;  private String msg;  private int code;
UJESSQLErrorCode(intcode,String msg) { this.code = code; this.msg = msg; }
public String getMsg() { return msg; }
public int getCode() { return code; }}



04

實現方案總結


Linkis JDBC模組設計的初衷是為了使用者能夠方便的通過JDBC的方式提交SQL任務到LInkis執行,在實現的過程中,我們參考了文章[1]進行初步的框架設計,實現過程中對於任務的提交和封裝參考了Linkis ujes中與job相關的文件,閱讀了一些JDBC相關的文章[2][3]。設計的過程中仍有一部分非必要的介面沒有實現,這是參考Kylin、Hive等專案中JDBC模組設計綜合考量後的結果,在不影響使用效果的前提下降低開發成本。
Linkis本身作為大資料產品的聯結器,具有強大的整合和可拓展性,JDBC模組也是Linkis的向外相容的一個具體實現,期待服務於社群一歲多的Linkis能夠茁壯成長,在大家的共同栽培下枝繁葉茂。

05

參考文獻

[1] create-your-own-type-3-jdbc-driverhttps://www.javaworld.com/article/2074249/create-your-own-type-3-jdbc-driver--part-1.html[2] Java JDBC的優雅設計https://blog.csdn.net/yisizhu/article/details/104025220[3] Class.forName載入JDBC驅動程式時,底層都做了些什麼???https://www.cnblogs.com/liuxianan/archive/2012/08/04/2623258.html

WeDataSphere,BIG DATA MADE EASY.

用心做一個有溫度的開源社群

~歡迎關注~


掃碼關注我們

微訊號公眾號 : WeDataSphere

GitHub:WeDataSphere

如果喜歡我們的產品或文章,請給我們的GitHub點上你寶貴的star和fork哦~~

來源:朝陽SEO