1. 程式人生 > >Spark 框架安全認證實現

Spark 框架安全認證實現

導言

隨著大資料叢集的使用,大資料的安全受到越來越多的關注一個安全的大資料叢集的使用,運維必普通的叢集更為複雜。
叢集的安全通常基於kerberos叢集完成安全認證。kerberos基本原理可參考:一張圖瞭解Kerberos訪問流程

Spark應用(On Yarn模式下)在安全的hadoop叢集下的訪問,需要訪問各種各樣的元件/程序,如ResourceManager,NodeManager,NameNode,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等。尤其是在長時執行的應用,如sparkStreaming,StructedStreaming,如何保證使用者認證後的長期有效性,其安全/認證更為複雜。

一個Spark應用提交使用者要先在kdc中完成使用者的認證,及拿到對應service服務的票據之後才能訪問對應的服務。由於Spark應用執行時涉及yarnclient,driver,applicationMaster,executor等多個服務,這其中每個程序都應當是同一個使用者啟動並執行,這就涉及到多個程序中使用同一個使用者的票據來對各種服務進行訪問,本文基於Spark2.3對此做簡要分析。

  • spark應用包含程序
程序 功能 yarn-client模式 yarn-cluster模式
yarnclient Spark應用提交app的模組 yarn-client模式下生命週期與driver一致; yarn-cluster模式下可以設定為app提交後即退出,或者提交後一直監控app執行狀態
driver spark應用驅動器,排程應用邏輯,應用的“大腦” yarn-client模式下執行在yarnclient的JVM中; yarn-cluster模式下執行在applicationMaster中
applicationMaster 基於yarn服務抽象出的app管理者 yarn-client模式下僅僅負責啟動/監控container,彙報應用狀態的功能; yarn-cluster模式下負責啟動/監控container,彙報應用狀態的功,同時包含driver功能
Executor spark應用的執行器,yarn應用的container實體,業務邏輯的實際執行者

spark應用的提交使用者認證之後才能提交應用,所以在yarnclient/driver的邏輯中必然會執行到kerberos認證相關的登入認證。然而其他的程序如applicationMaster,executor等均需要經過認證,應用提交後才由使用者啟動,這些程序則可以不進行kerberos認證而是利用Hadoop的token機制完成認證,減小kerberos服務壓力,同時提高訪問效率。

  • Hadoop Token機制

Hadoop的token實現基類為org.apache.hadoop.security.token.Token,

/**
   * Construct a token from the components.
   * @param identifier the token identifier
   * @param password the token's password
   * @param kind the kind of token
   * @param service the service for this token
   */
  public Token(byte[] identifier, byte[] password, Text kind, Text service) {
    this.identifier = identifier;
    this.password = password;
    this.kind = kind;
    this.service = service;
  }

不同的服務也可hadoop的token來互動,只要使用不同的identifer來區分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier來區分不同的服務型別的token。

Spark應用各程序的安全實現

yarnclient的實現

此處yarnclient指的是向ResourceManager提交yarn應用的客戶端。在spark中,向yarn提交應用有兩種應用有yarn-client,yarn-cluster模式。在這兩種應用模式下提交應用,yarn client邏輯有些許不同。

安全hadoop場景下spark的使用者登入認證機制

  • spark提交應用時,通過--principal, --keytab引數傳入認證所需檔案。
    在sparkSubmit中prepareSubmitEnvironment時,完成認證

     // assure a keytab is available from any place in a JVM
     if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
       if (args.principal != null) {
         if (args.keytab != null) {
           require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
           // Add keytab and principal configurations in sysProps to make them available
           // for later use; e.g. in spark sql, the isolated class loader used to talk
           // to HiveMetastore will use these settings. They will be set as Java system
           // properties and then loaded by SparkConf
           sparkConf.set(KEYTAB, args.keytab)
           sparkConf.set(PRINCIPAL, args.principal)
           UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
         }
       }
     }
    
  • 在yarn-cluster模式下,不會呼叫業務層程式碼,即不會初始化SparkContext,其通過YarnClusterApplication的start方法呼叫client.submitApplication提交應用

  • 在yarn-client模式下,會在yarnclient邏輯中呼叫業務程式碼,即會初始化並執行SparkContext,通過YarnClientSchedulerBackend其排程client.submitApplication提交應用。

在client的submitApplication方法中提交app,之後建立amContext,準備本地資源,此時會將本地的檔案上傳至HDFS,其中就包括keytab檔案,同時會生成spark_conf.properties配置檔案以供am使用,該配置檔案中會包含keytab的配置

 val props = new Properties()
  sparkConf.getAll.foreach { case (k, v) =>
    props.setProperty(k, v)
  }
  // Override spark.yarn.key to point to the location in distributed cache which will be used
  // by AM.
  Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }

其中的amKeytabFileName是在setUpCredentials時設定如下,該值為指定的keytab檔案加上隨機的字串字尾,騎在am重點使用,可參考下節的介紹。

val f = new File(keytab)
  // Generate a file name that can be used for the keytab file, that does not conflict
  // with any user file.
  amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
  sparkConf.set(PRINCIPAL.key, principal)

獲取相關元件的token,注意:此處的token均非與yarn服務互動相關token,這裡只有與HDFS,HBASE,Hive服務互動的token。

 def obtainDelegationTokens(
  hadoopConf: Configuration,
  creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
  if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
  // 各provider的obtainDelegationTokens方法中,會獲取對應元件的token,並放入credentials中
    provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
  } else {
    logDebug(s"Service ${provider.serviceName} does not require a token." +
      s" Check your configuration to see if security is disabled or not.")
    None
  }
}.foldLeft(Long.MaxValue)(math.min)

}

Spark中常訪問的服務使用token機制的有hive,hbase,hdfs,對應的tokenProvider如下:

服務 tokenProvider token獲取類 token獲取方法
HDFS HadoopFSDelegationTokenProvider org.apache.hadoop.hbase.security.token.TokenUtil obtainToken
HIVE HiveDelegationTokenProvider org.apache.hadoop.hive.ql.metadata getDelegationToken
HBASE HBaseDelegationTokenProvider org.apache.hadoop.hdfs.DistributedFileSystem addDelegationTokens

以HbaseDelegationTokenProvider為例,主要是通過反射呼叫hbase的TokenUtil類的obtainTOken方法,對應的obtainDelegationTokens方法如下:

override def obtainDelegationTokens(
  hadoopConf: Configuration,
  sparkConf: SparkConf,
  creds: Credentials): Option[Long] = {
try {
  val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
  val obtainToken = mirror.classLoader.
    loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
    getMethod("obtainToken", classOf[Configuration])
  logDebug("Attempting to fetch HBase security token.")
  val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
    .asInstanceOf[Token[_ <: TokenIdentifier]]
  logInfo(s"Get token from HBase: ${token.toString}")
  creds.addToken(token.getService, token)
} catch {
  case NonFatal(e) =>
    logDebug(s"Failed to get token from service $serviceName", e)
}
None
}

PS : HBase的token獲取的使用者需要具有hbase:meta表的exec許可權,否則無法成功獲取token

在獲取token後,將token設定到amContainer中,並放入appContext中

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
//
appContext.setAMContainerSpec(containerContext)

driver的token更新

在yarn-client模式下,driver在yarnclient程序中啟動,同樣需要訪問業務層及叢集的相關元件如hdfs。driver通過讀取am更新在hdfs路徑下的credentials檔案來保證driver節點的token有效。

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
    YarnSparkHadoopUtil.startCredentialUpdater(conf)
} 

在yarn-cluster模式下,driver執行在applicationMaster的JVM中,其安全相關由Am同一操作

ApplicationMaster 的安全認證

applicationMaster是Yarn進行應用排程/管理的核心,需要與RM/NM等進行互動以便應用執行。其中相關的互動均通過token完成認證,認證實現由Yarn內部框架完成。檢視am日誌發現,即是在非安全(非kerberos)的場景下,同樣會使用到token。而與hdfs,hbase等服務互動使用的token則需Spark框架來實現。

applicationMaster中與YARN相關的認證

  • AM與RM的認證

在ResourceManager接收到應用提交的ApplicationSubmissionContext後,在其AmLauncher.java的run方法中為am設定生成“YARN_AM_RM_TOKEN,該token用於am於rm通訊使用”

 public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
  ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {
  LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
  AMRMTokenIdentifier identifier =
      new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
        .getKeyId());
  byte[] password = this.createPassword(identifier);
  appAttemptSet.add(appAttemptId);
  return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
    identifier.getKind(), new Text());
} finally {
  this.writeLock.unlock();
}
}
  • AM與NM的認證

Am在啟動之後,會向ResourceManager申請container,並與對應的NodeManager通訊以啟動container。然而AM與NM通訊的token是如何得到的呢?

檢視AMRMClientImpl類可以看到,AM向RM傳送分配請求,RM接收到請求後,將container要分配至的NM節點的Token放置response中返回給AM。Am接收到response後,會儲存NMToken,並判定是否需要更新YARN_AM_RM_TOKEN

//通過rmClient向RM傳送分配請求
allocateResponse = rmClient.allocate(allocateRequest);
//拿到response後,儲存NMToken並根據response判定是否需要更新AMRM通訊的TOken
if (!allocateResponse.getNMTokens().isEmpty()) {
      populateNMTokens(allocateResponse.getNMTokens());
    }
    if (allocateResponse.getAMRMToken() != null) {
      updateAMRMToken(allocateResponse.getAMRMToken());
    }

RM通過ApplicationMasterService響應allocation請求

// 通過排程器為cotnainer分配NodeManager並生成該NodeManager的Token放入allcation中
 Allocation allocation =
      this.rScheduler.allocate(appAttemptId, ask, release, 
          blacklistAdditions, blacklistRemovals);
 ......
  if (!allocation.getContainers().isEmpty()) {
    allocateResponse.setNMTokens(allocation.getNMTokens());
  }

AM在準備啟動container時,將當前使用者的token都設定進ContainerLaunchContext中

def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
  .asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

ApplicationMaster業務相關的服務的token更新

Am啟動的資源情況

檢視Am啟動命令大致如下,可以發現有指定配置檔案,而該配置檔案即為yarnclient生成上傳至hdfs,在am啟動前由NodeManager從hdfs中copy至本地路徑,供container使用:

 /usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties

檢視此配置檔案可以看到有如下配置項:

[email protected]
spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a

下圖為am程序使用到的資原始檔

 

am程序資源.jpg

如上可以看出,am雖然執行在叢集中,但執行時認證相關的資源已經準備就緒。下面分析其執行中關於安全的邏輯

Am安全認證及token更新邏輯

在applicationMaster中,定期更新token,並寫入檔案到hdfs的相關目錄,並清理舊檔案以供各executor使用。
在ApplicationMaster啟動後,進行login登入並啟動名為am-kerberos-renewer的dameon執行緒定期登入,保證使用者認證的有效性

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {
  UserGroupInformation.loginUserFromKeytab(principal, keytab)

  val renewer = new Thread() {
    override def run(): Unit = Utils.tryLogNonFatalError {
      while (true) {
        TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
      }
    }
  }
  renewer.setName("am-kerberos-renewer")
  renewer.setDaemon(true)
  renewer.start()

  // Transfer the original user's tokens to the new user, since that's needed to connect to
  // YARN. It also copies over any delegation tokens that might have been created by the
  // client, which will then be transferred over when starting executors (until new ones
  // are created by the periodic task).
  val newUser = UserGroupInformation.getCurrentUser()
  SparkHadoopUtil.get.transferCredentials(original, newUser)
  newUser
} else {
  SparkHadoopUtil.get.createSparkUser()
}
}

在am中啟動AMCredentialRenewerStarter執行緒,排程認證登入及token renew邏輯

if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
        val credentialRenewerThread = new Thread {
          setName("AMCredentialRenewerStarter")
          setContextClassLoader(userClassLoader)
      override def run(): Unit = {
        val credentialManager = new YARNHadoopDelegationTokenManager(
          sparkConf,
          yarnConf,
          conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
        val credentialRenewer =
          new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
        credentialRenewer.scheduleLoginFromKeytab()
      }
    }
    credentialRenewerThread.start()
    credentialRenewerThread.join()
  }

在scheduleLoginFromKeytab中,會週期排程登入,token獲取更新寫入hdfs檔案等操作。
其核心邏輯如下

排程週期:

各種元件的token更新週期如hdfs的更新週期dfs.namenode.delegation.token.renew-interval預設為1天,hbase的token更新週期hbase.auth.key.update.interval預設為1天;排程更新的週期為如上各元件最小值的75%,

排程流程:

//將生成的token寫入hdfs目錄${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix}
writeNewCredentialsToHDFS(principal, keytab)
//刪除邏輯為保留五個(${spark.yarn.credentials.file.retention.count})檔案,檔案更新時間早於五天(${spark.yarn.credentials.file.retention.days})的全部清理
cleanupOldFiles()

Executor的認證機制

executor的認證同樣使用的是token機制。executor啟動之後,根據driver啟動設定的${spark.yarn.credentials.file}啟動token更新:

if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
      driverConf.get("spark.yarn.credentials.file"))
    Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
      .getMethod("startCredentialUpdater", classOf[SparkConf])
      .invoke(null, driverConf)
  }

Executor中的token更新是讀取hdfs目錄{spark.yarn.credentials.file}-{timeStamp}-${nextSuffix}目錄下的檔案,讀取到快取中,以便保證讀取到的是更新後的token使用。

安全Spark的使用

Spark框架完成的kerberos認證及使用token與其他服務互動的機制使用較為簡單,只需要在提交應用時的spark-submit命令列中加入--principal appuserName --keytab /path/to/user.keytab即可