1. 程式人生 > 實用技巧 >mark,解決flink sql叢集提交任務不能指定名字的問題

mark,解決flink sql叢集提交任務不能指定名字的問題

1)先上一段程式碼

package com.rookie.submit.main

import java.io.File

import com.rookie.submit.common.{Common, Constant}
import com.rookie.submit.common.Constant._
import com.rookie.submit.util.{RegisterUdf, SqlFileUtil, TableConfUtil}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, StatementSet}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._

/**
  * sqlSubmit main class
  * input sql file name and execute sql content
  */
object SqlSubmit {

  private val logger = LoggerFactory.getLogger("SqlSubmit")

  def main(args: Array[String]): Unit = {
    // parse input parameter and load job properties
    val paraTool = Common.init(args)

    // parse sql file
    val sqlList = SqlFileUtil.readFile(paraTool.get(INPUT_SQL_FILE_PARA))

    // StreamExecutionEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // state backend and checkpoint
    enableCheckpoint(env, paraTool)
    // EnvironmentSettings
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    // create table enviroment
    val tabEnv = StreamTableEnvironment.create(env, settings)
    // table Config
    TableConfUtil.conf(tabEnv, paraTool)

    // register catalog, only in server
    if ("/".equals(File.separator)) {
      //      val catalog = new HiveCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), paraTool.get(Constant.HIVE_DEFAULT_DATABASE), paraTool.get(Constant.HIVE_CONFIG_PATH), paraTool.get(Constant.HIVE_VERSION))
      val catalog = new HiveCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), paraTool.get(Constant.HIVE_DEFAULT_DATABASE), paraTool.get(Constant.HIVE_CONFIG_PATH))
      tabEnv.registerCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME), catalog)
      tabEnv.useCatalog(paraTool.get(Constant.HIVE_CATALOG_NAME))
    }

    // load udf
    RegisterUdf.registerUdf(tabEnv)

    // execute sql
    val statement = tabEnv.createStatementSet()
    var result: StatementSet = null
    for (sql <- sqlList) {
      try {
        if (sql.startsWith("insert")) {
          // ss
          result = statement.addInsertSql(sql)
        } else {
          if (sql.contains("hive_table_")) {
            tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE)
          } else {
            tabEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT)
          }
          logger.info("dialect : " + tabEnv.getConfig.getSqlDialect)
          println("dialect : " + tabEnv.getConfig.getSqlDialect)
          tabEnv.executeSql(sql)
        }
        logger.info("execute success : " + sql)
        println("execute success : " + sql)
      } catch {
        case e: Exception =>
          println("execute sql error : " + sql)
          logger.error("execute sql error : " + sql, e)
          e.printStackTrace()
          System.exit(-1)
      }
    }
    // execute insert
    result.execute(Common.jobName)
//    result.execute()
    // not need, sql will execute when call executeSql
    //    env.execute(Common.jobName)
  }

  def enableCheckpoint(env: StreamExecutionEnvironment, paraTool: ParameterTool): Unit = {
    // state backend
    var stateBackend: StateBackend = null
    if ("rocksdb".equals(paraTool.get(STATE_BACKEND))) {
      stateBackend = new RocksDBStateBackend(paraTool.get(CHECKPOINT_DIR), true)
    } else {
      stateBackend = new FsStateBackend(paraTool.get(CHECKPOINT_DIR), true)
    }
    env.setStateBackend(stateBackend)
    // checkpoint
    env.enableCheckpointing(paraTool.getLong(CHECKPOINT_INTERVAL) * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(paraTool.getLong(CHECKPOINT_TIMEOUT) * 1000)
    // Flink 1.11.0 new feature: Enables unaligned checkpoints
    env.getCheckpointConfig.enableUnalignedCheckpoints()
  }

}

關鍵在於 StatementSet物件:

我們發現這個類,不支援這個傳入引數,所以我們需要修改原始碼:

3)如圖,主要是StatementSetImpl這個實現新增的方法

4)需要編譯打包,這些類屬於

package org.apache.flink.table

5)github參考地址:

https://github.com/springMoon/sqlSubmit/blob/master/src/main/scala/com/rookie/submit/main/SqlSubmit.scala