1. 程式人生 > >SparkSQL讀取Hive中的資料

SparkSQL讀取Hive中的資料

注意紅色字。

----------------------

由於我Spark採用的是Cloudera公司的CDH,並且安裝的時候是線上自動安裝和部署的叢集。最近在學習SparkSQL,看到SparkSQL on HIVE。下面主要是介紹一下如何通過SparkSQL在讀取HIVE的資料。

(說明:如果不是採用CDH線上自動安裝和部署的話,可能需要對原始碼進行編譯,使它能夠相容HIVE。

編譯的方式也很簡單,只需要在Spark_SRC_home(原始碼的home目錄下)執行如下命令:

./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.6.0-cdh5.4.4 -Phive

編譯好了之後,會在lib目錄下多幾個jar包。)

下面我主要介紹一下我使用的情況:

1、為了讓Spark能夠連線到Hive的原有資料倉庫,我們需要將Hive中的hive-site.xml檔案拷貝到Spark的conf目錄下,這樣就可以通過這個配置檔案找到Hive的元資料以及資料存放。

在這裡由於我的Spark是自動安裝和部署的,因此需要知道CDH將hive-site.xml放在哪裡。經過摸索。該檔案預設所在的路徑是:/etc/hive/conf 下。

同理,spark的conf也是在/etc/spark/conf。

此時,如上所述,將對應的hive-site.xml拷貝到spark/conf目錄下即可

  如果Hive的元資料存放在Mysql中,我們還需要準備好Mysql相關驅動,比如:mysql-connector-java-5.1.22-bin.jar。

2、編寫測試程式碼

1 2 3 4 5 6 7 8 9 10 11 12 13 val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local") val sc=new SparkContext(conf) //create hivecontext val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ")   //這裡需要注意資料的間隔符 sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src  "); sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println) sc.stop()

 3、下面列舉一下出現的問題:

(1)如果沒有將hive-site.xml拷貝到spark/conf目錄下,會出現:

分析:從錯誤提示上面就知道,spark無法知道hive的元資料的位置,所以就無法例項化對應的client。

解決的辦法就是必須將hive-site.xml拷貝到spark/conf目錄下

(2)測試程式碼中沒有加sc.stop會出現如下錯誤:

ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException

在程式碼最後一行新增sc.stop()解決了該問題。