Spark 1.5.2 on yarn升級問題總結
1 升級背景
standlone 生產叢集運行了半年,出現資源瓶頸;另外多使用者資源管理問題也凸顯,將spark 遷移到 yarn 上面是目前比較理想的方案。
spark on yarn 有如下兩個優點:
- 充分使用叢集資源,方便多使用者資源管理;
- 擴容更為方便;
2 遇到問題
1) 程式碼使用system.exit(-1)結果卻顯示正常
測試程式碼:
def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HiveTest") val initConf = SparkConstant.initConf(sparkConf) val sc = new SparkContext(initConf) sc.parallelize(1 to 2000000,4).map(x=>x%0.24).map{ x=> x+0.1 System.exit(-1) sc.stop() }
任務退出後applicationmaster卻顯示任務成功:
某個別業務當程式遇到的異常的時候,直接使用System.exit(-1)退出程式,出現了上面的情況。
appmaster日誌分析:
16/04/08 11:07:29 INFO storage.MemoryStore: MemoryStore cleared 16/04/08 11:07:29 INFO storage.BlockManager: BlockManager stopped 16/04/08 11:07:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 16/04/08 11:07:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/04/08 11:07:29 INFO spark.SparkContext: Successfully stopped SparkContext 16/04/08 11:07:29 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.) 16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/04/08 11:07:29 INFO yarn.ApplicationMaster: <span style="color:#ff6666;">Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)</span> 16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/04/08 11:07:29 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 16/04/08 11:07:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 16/04/08 11:07:29 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1460021101912_10211
如日誌顯示:ApplicationMaster提示任務被提前stop,但是為什麼顯示退出success了?下面我們分析相關原始碼:
if (!finished) { // This happens when the user application calls System.exit(). We have the choice // of either failing or succeeding at this point. We report success to avoid // retrying applications that have succeeded (System.exit(0)), which means that // applications that explicitly exit with a non-zero status will also show up as // succeeded in the RM UI. finish(finalStatus, ApplicationMaster.EXIT_SUCCESS, "Shutdown hook called before final status was reported.") } final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = { synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() logInfo(s"Final app status: $status, exitCode: $code" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) exitCode = code finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { logDebug("shutting down reporter thread") reporterThread.interrupt() } if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { logDebug("shutting down user thread") userClassThread.interrupt() } if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop()) } } }
如果異常退出,將ApplicationMaster的exist code設定為0,也就是正常退出。我們看看這樣做的原因,如上面說明解釋,始終顯示success的原因是防止applicationmaster被重試,導致任務失敗會再次提交。
解決辦法:
- 直接選擇丟擲異常;如果讀者選擇丟擲異常的話,applicationmaster會選擇下面程式碼:
case e: Throwable => {
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
直接將該任務置為錯誤狀態,但是會導致任務重試。
- 判斷任務成功的標誌應該是exitcode為0 並且Diagnostics不顯示Shutdownhook called before final status was reported;
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveTest")
val initConf = SparkConstant.initConf(sparkConf)
val sc = new SparkContext(initConf)
sc.parallelize(1 to 2000000,4).map(x=>x%0.24).map{ x=> x+0.1
}.reduce(_+_)
val e = new Exception("this is my exception")
throw e
}
}
2) driver、executor PermGen Space oom
在計算過程中,特別是載入hive或者HBase第三方packages的情況下,出現driver、executor大量的PermGenSpace oom。spark on yarn和standlone一樣,需要配置driver、executor的jvm相關引數。目前我們的配置是:spark.driver.extraJavaOptions -XX:MaxPermSize=512m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95
spark.executor.extraJavaOptions -XX:MaxPermSize=512m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95
- -XX:MaxPermSize=512m:增加PermGen Space大小,預設是128M;會發生PermGenSpace oom;
- -XX:+PrintGCDetails-XX:+PrintGCTimeStamps:列印GC日誌stdout日誌,方便觀察計算過程中的GC情況和記憶體使用情況;
- -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95:修改GC策略,目前修改為CMS 策略,後面準備嘗試G1策略;
3) 自定義mysql驅動導致讀寫資料失敗問題
我們對mysql驅動進行了封裝,為了保證內部資料安全。在1.4.0的使用方式如下:
val sqlContext = new SQLContext(sc)
DriverManager.registerDriver(new CBTMysqlDriver)
val props = new java.util.TreeMap[String, String]
props.put("url", "jdbc:CBTMysqlDriver://*******")
props.put("dbtable", "mysql.user")//database.tablename
props.put("driver", "CBTMysqlDriver")
val df2: DataFrame = sqlContext.read.format("jdbc").options(props).load()
val list = df2.collect()
list.foreach(x => println(x))
sc.stop()
在1.4.0能夠正常從mysql讀取資料,但是遷移到1.5.2以後出現無法讀寫mysql資料。分析原因以後發現如下程式碼:
/**
* :: DeveloperApi ::
* Default mysql dialect to read bit/bitsets correctly.
*/
@DeveloperApi
case object MySQLDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Some(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Some(BooleanType)
} else None
}
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
在讀取mysql 資料之前,driver需要讀取指定表的schema,在讀取的時候需要選擇相應的驅動,選擇的方法是:
overridedef canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
這樣就導致我們自己封裝的mysql 驅動無法找到。
解決方法是實現自己的mysql驅動Dialect類:
class MySQLDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Some(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Some(BooleanType)
} else None
}
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
}
case object CBTMySQLDialect extends MySQLDialect{
override def canHandle(url : String): Boolean = url.startsWith("jdbc:CBTMysqlDriver")
}
4) hive table name命名不規範
個別業務的hivetable名稱中間存在點號,比如mydatabase.my.table;在spark 1.5.2程式碼裡面對此進行了強制檢查,具體程式碼如下:/**
* It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
* We use this method to check it.
*/
protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
if (tableIdentifier.length > 1) {
throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
"for temporary tables. If the table name has dots (.) in it, please quote the " +
"table name with backticks (`).")
}
}
}
命令規範問題需要重視。
5) hive_metastore ConnectionPassword加密
spark 連線hive metastore需要使用hive-site.xml,該配置檔案給使用者暴露了連線的metastore的使用者名稱和密碼,這樣會導致兩個問題:
1) 瞭解spark的使用者能夠獲取到metastore 資料庫的密碼;
2) 任意使用者在獲取hive-site.xml後使用我們規定版本外的spark jar包提交spark任務到叢集;
基於上面兩點,我們在程式碼裡面對metastore 資料庫的密碼的密碼加密;通過閱讀下面原始碼:
protected[hive] lazy val metadataHive: ClientInterface = {
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
val metadataConf = new HiveConf()
val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
logInfo("default warehouse location is " + defaultWarehouseLocation)
// `configure` goes second to override other settings.
val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure
.........
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars")
new IsolatedClientLoader(
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
}
isolatedLoader.client
metadataHive 通過HiveConf()載入系統hive-site.xml,然後將metadataConf傳輸給allConfig變數,會通過IsolatedClientLoader創建於metastore連線的state變數,只需要獲取metadataConf裡面的HiveConf.ConfVars.METASTOREPWD.varname變數,然後對其解密,再改變HiveConf()HiveConf.ConfVars.METASTOREPWD.varname即可。加密程式碼如下:
protected[hive] lazy val metadataHive: ClientInterface = {
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
val metadataConf = new HiveConf()
// added by Ricky
val passwd=metadataConf.get(HiveConf.ConfVars.METASTOREPWD.varname)
val passWord = PasswdDecrypt(passwd.toString)//加密模組,自己選擇加密演算法。
metadataConf.set(HiveConf.ConfVars.METASTOREPWD.varname,passWord)
hiveconf.set(HiveConf.ConfVars.METASTOREPWD.varname,passWord)//重置全域性密碼
// end by Ricky
val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
logInfo("default warehouse location is " + defaultWarehouseLocation)
6) spark sql 不支援hive表的讀許可權控制
sparksql 對所有hive表都有讀許可權,目前社群也遇到相似問題;
Intel大神提交了一個patch(https://issues.apache.org/jira/browse/SPARK-8321)目前還在討論合併到社群事宜。
該patch的解決思路:在執行計劃中新增一個authorized模組,採用hive的AuthorizerV2認證機制對當前的logicalPlan進行認證;目前我們採用的是AuthorizerV1認證方式,直接採用該patch需要升級Authorizer方式。
我們提出的短期的解決方式是:在parsesql模組,直接呼叫AuthorizerV1方式對select 語句進行許可權檢查,這樣的缺點需要生成hive的logicalPlan去進行許可權檢查,目前還在測試.
7) Dynamic Resource Allocation報錯
測試下面程式碼:
sc.parallelize(1 to 2000000000,20).map(x=>x%3-0.1).reduce(_+_)
sc.parallelize(1 to 2000000000,40).map(x=>x%3-0.1).reduce(_+_)
sc.parallelize(1 to 2000000000,80).map(x=>x%3-0.1).reduce(_+_)
當executor退出的時候driverstderror出現了下面的錯誤:
6/02/17 17:48:32 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:50558] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:33 ERROR YarnScheduler: Lost executor 4 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:33 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:56181] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:33 ERROR YarnScheduler: Lost executor 1 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:33 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:39840] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 5 on namenode2-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:33914] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 3 on slave01-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:52934] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:34 ERROR YarnScheduler: Lost executor 8 on namenode1-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:34 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:55408] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
16/02/17 17:48:37 ERROR YarnScheduler: Lost executor 7 on slave01-sit.cnsuning.com: remote Rpc client disassociated
16/02/17 17:48:37 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:39890] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
目前還在研究該錯誤,本次升級放棄DynamicResource Allocation 功能。
8) 使用spark.cores.max 打散資料
val maxCores=conf.get("spark.executor.instances").toInt
if(maxCores > 0){
conf.set("spark.default.parallelism",(3*maxCores).toString)
conf.set("spark.sql.shuffle.partitions",(3*maxCores).toString)
}
不要使用,yarn 模式下面spark.cores.max不生效
val maxCores=conf.get("spark.cores.max").toInt
if(maxCores > 0){
conf.set("spark.default.parallelism",(3*maxCores).toString)
conf.set("spark.sql.shuffle.partitions",(3*maxCores).toString)
}