1. 程式人生 > >Spark on Yarn with Hive實戰案例與常見問題解決

Spark on Yarn with Hive實戰案例與常見問題解決

ast spa dfs img 運維 base kcon 運維人員 來看

[TOC]


1 場景

在實際過程中,遇到這樣的場景:

日誌數據打到HDFS中,運維人員將HDFS的數據做ETL之後加載到hive中,之後需要使用Spark來對日誌做分析處理,Spark的部署方式是Spark on Yarn的方式。

從場景來看,需要在我們的Spark程序中通過HiveContext來加載hive中的數據。

如果希望自己做測試,環境的配置可以參考我之前的文章,主要有下面的需要配置:

  • 1.Hadoop環境
    • Hadoop環境的配置可以參考之前寫的文章;
  • 2.Spark環境
    • Spark環境只需要在提交job的節點上進行配置即可,因為使用的是Spark on Yarn的方式;
  • 3.Hive環境
    • 需要配置好Hive環境,因為在提交Spark任務時,需要連同hive-site.xml文件一起提交,因為只有這樣才能夠識別已有的hive環境的元數據信息;
    • 所以其實中Spark on Yarn的部署模式中,需要的只是hive的配置文件,以讓HiveContext能夠讀取存儲在mysql中的元數據信息以及存儲在HDFS上的hive表數據;
    • hive環境的配置可以參考之前的文章;

其實之前已經有寫過Spark Standalone with Hive的文章,可以參考:《Spark SQL筆記整理(三):加載保存功能與Spark SQL函數》。

2 編寫程序與打包

作為一個測試案例,這裏的測試代碼比較簡單,如下:

package cn.xpleaf.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author xpleaf
  */
object _01HiveContextOps {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
//            .setMaster("local[2]")
            .setAppName(s"${_01HiveContextOps.getClass.getSimpleName}")

        val sc = new SparkContext(conf)
        val hiveContext = new HiveContext(sc)

        hiveContext.sql("show databases").show()

        hiveContext.sql("use mydb1")
        // 創建teacher_info表
        val sql1 = "create table teacher_info(\n" + "name string,\n" + "height double)\n" + "row format delimited\n" + "fields terminated by ‘,‘"
        hiveContext.sql(sql1)

        // 創建teacher_basic表
        val sql2 = "create table teacher_basic(\n" + "name string,\n" + "age int,\n" + "married boolean,\n" + "children int)\n" + "row format delimited\n" + "fields terminated by ‘,‘"
        hiveContext.sql(sql2)

        // 向表中加載數據
        hiveContext.sql("load data inpath ‘hdfs://ns1/data/hive/teacher_info.txt‘ into table teacher_info")
        hiveContext.sql("load data inpath ‘hdfs://ns1/data/hive/teacher_basic.txt‘ into table teacher_basic")

        // 第二步操作:計算兩張表的關聯數據
        val sql3 = "select\n" + "b.name,\n" + "b.age,\n" + "if(b.married,‘已婚‘,‘未婚‘) as married,\n" + "b.children,\n" + "i.height\n" + "from teacher_info i\n" + "inner join teacher_basic b on i.name=b.name"
        val joinDF:DataFrame = hiveContext.sql(sql3)

        val joinRDD = joinDF.rdd
        joinRDD.collect().foreach(println)

        joinDF.write.saveAsTable("teacher")

        sc.stop()
    }

}

可以看到其實只是簡單的在hive中建表、加載數據、關聯數據與保存數據到hive表中。

編寫完成之後打包就可以了,註意不需要將依賴一起打包。之後就可以把jar包上傳到我們的環境中了。

3 部署

編寫submit腳本,如下:

[hadoop@hadoop01 jars]$ cat spark-submit-yarn.sh 
/home/hadoop/app/spark/bin/spark-submit --class $2 --master yarn --deploy-mode cluster --executor-memory 1G --num-executors 1 --files $SPARK_HOME/conf/hive-site.xml --jars $SPARK_HOME/lib/mysql-connector-java-5.1.39.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar $1 \

註意其中非常關鍵的--files--jars,說明如下:

--files $HIVE_HOME/conf/hive-site.xml    //將Hive的配置文件添加到Driver和Executor的classpath中
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.39.jar,….    //將Hive依賴的jar包添加到Driver和Executor的classpath中

之後就可以執行腳本,將任務提交到Yarn上:

[hadoop@hadoop01 jars]$ ./spark-submit-yarn.sh spark-process-1.0-SNAPSHOT.jar cn.xpleaf.spark.scala.sql.p2._01HiveContextOps

4 查看結果

需要說明的是,如果需要對執行過程進行監控,就需要進行配置historyServer(mr的jobHistoryServer和spark的historyServer),可以參考我之前寫的文章。

4.1 Yarn UI

技術分享圖片

技術分享圖片

4.2 Spark UI

技術分享圖片

技術分享圖片

4.3 Hive

可以啟動hive,然後查看我們的spark程序加載的數據:

hive (mydb1)> 
            > 
            > 
            > show tables;
OK
t1
t2
t3_arr
t4_map
t5_struct
t6_emp
t7_external
t8_partition
t8_partition_1
t8_partition_copy
t9
t9_bucket
teacher
teacher_basic
teacher_info
test
tid
Time taken: 0.057 seconds, Fetched: 17 row(s)
hive (mydb1)> select *
            > from teacher_info;
OK
zhangsan        175.0
lisi    180.0
wangwu  175.0
zhaoliu 195.0
zhouqi  165.0
weiba   185.0
Time taken: 1.717 seconds, Fetched: 6 row(s)
hive (mydb1)> select *
            > from teacher_basic;
OK
zhangsan        23      false   0
lisi    24      false   0
wangwu  25      false   0
zhaoliu 26      true    1
zhouqi  27      true    2
weiba   28      true    3
Time taken: 0.115 seconds, Fetched: 6 row(s)
hive (mydb1)> select *
            > from teacher;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
zhangsan        23      未婚    0       175.0
lisi    24      未婚    0       180.0
wangwu  25      未婚    0       175.0
zhaoliu 26      已婚    1       195.0
zhouqi  27      已婚    2       165.0
weiba   28      已婚    3       185.0
Time taken: 0.134 seconds, Fetched: 6 row(s)

5 問題與解決

1.User class threw exception: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

註意我們的Spark部署模式是Yarn,yarn上面是沒有相關spark和hive的相關依賴的,所以在提交任務時,必須要指定要上傳的jar包依賴:

--jars $SPARK_HOME/lib/mysql-connector-java-5.1.39.jar,$SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar \

其實在提交任務時,註意觀察控制臺的輸出:

18/10/09 10:57:44 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/spark-assembly-1.6.2-hadoop2.6.0.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/jars/spark-process-1.0-SNAPSHOT.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/spark-process-1.0-SNAPSHOT.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/mysql-connector-java-5.1.39.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/mysql-connector-java-5.1.39.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-api-jdo-3.2.6.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-api-jdo-3.2.6.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-core-3.2.10.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-core-3.2.10.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/lib/datanucleus-rdbms-3.2.9.jar -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/datanucleus-rdbms-3.2.9.jar
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark/conf/hive-site.xml -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/hive-site.xml
18/10/09 10:57:47 INFO yarn.Client: Uploading resource file:/tmp/spark-6f582e5c-3eef-4646-b8c7-0719877434d8/__spark_conf__103916311924336720.zip -> hdfs://ns1/user/hadoop/.sparkStaging/application_1538989570769_0023/__spark_conf__103916311924336720.zip

也可以看到,其會將相關spark相關的jar包上傳到yarn的環境也就是hdfs上,之後再執行相關的任務。

2.User class threw exception: org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException [Error 10072]: Database does not exist: mydb1

mydb1不存在,說明沒有讀取到我們已有的hive環境的元數據信息,那是因為在提交任務時沒有指定把hive-site.xml配置文件一並提交,如下:

--files $SPARK_HOME/conf/hive-site.xml \

Spark on Yarn with Hive實戰案例與常見問題解決