Hive-On-Spark
1 HiveOnSpark簡介
Hive On Spark (跟hive沒太大的關係,就是使用了hive的標準(HQL, 元資料庫、UDF、序列化、反序列化機制))
Hive原來的計算模型是MR,有點慢(將中間結果寫入到HDFS中)
Hive On Spark 使用RDD(DataFrame),然後執行在spark 叢集上
真正要計算的資料是儲存在HDFS中,mysql這個元資料庫,儲存的是hive表的描述資訊,描述了有哪些database、table、以及表有多少列,每一列是什麼型別,還要描述表的資料儲存在hdfs的什麼位置?
hive跟mysql的區別?
hive是一個數據倉庫(儲存資料並分析資料,分析資料倉庫中的資料量很大,一般要分析很長的時間)
mysql是一個關係型資料庫(關係型資料的增刪改查(低延遲))
hive的元資料庫中儲存要計算的資料嗎?
不儲存,儲存hive倉庫的表、欄位、等描述資訊
真正要計算的資料儲存在哪裡了?
儲存在HDFS中了
hive的元資料庫的功能
建立了一種對映關係,執行HQL時,先到MySQL元資料庫中查詢描述資訊,然後根據描述資訊生成任務,然後將任務下發到spark叢集中執行
hive on spark 使用的僅僅是hive的標準,規範,不需要有hive資料庫一樣可行。
hive : 元資料,是存放在mysql中,然後真正的資料是存放在hdfs中。
2 安裝mysql
mysql資料庫作為hive使用的元資料
3 配置HiveOnSpark
生成hive的元資料庫表,根據hive的配置檔案,生成對應的元資料庫表。
spark-sql 是spark專門用於編寫sql的互動式命令列。
當直接啟動spark-sql以local模式執行時,如果報錯:
是因為配置了Hadoop的配置引數導致的:
執行測試命令:
create table test (name string);
insert into test values(“xxtest”);
local模式下,預設使用derby資料庫,資料儲存於本地位置。
要想使用hive的標準,需要把hive的配置檔案放到spark的conf目錄下
cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/
vi hive-site.xml
hive-site.xml檔案:
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hdp-01:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
</configuration>
把該配置檔案,傳送給叢集中的其他節點:
cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/
for i in 2 3 ;do scp hive-site.xml hdp-0$i:`pwd` ;done
重新停止並重啟spark: start-all.sh
啟動spark-sql時,
出現如下錯誤是因為操作mysql時缺少mysql的驅動jar包,
解決方案1:--jars 或者 --driver-class-path 引入msyql的jar包
解決方案2: 把mysql的jar包新增到$spark_home/jars目錄下
啟動時指定叢集:(如果不指定master,預設就是local模式)
spark-sql --master spark://hdp-01:7077 --jars /root/mysql-connector-java-5.1.38.jar
sparkSQL會在mysql上建立一個database,需要手動改一下DBS表中的DB_LOCATION_UIR改成hdfs的地址
hdfs://hdp-01:9000/user/hive/spark-warehouse
也需要檢視一下,自己建立的資料庫表的儲存路徑是否是hdfs的目錄。
執行spark-sql任務之後:可以在叢集的監控介面檢視
同樣 ,會有SparkSubmit程序存在。
4 IDEA程式設計
要先開啟spark對hive的支援
//如果想讓hive執行在spark上,一定要開啟spark對hive的支援
val session = SparkSession.builder()
.master("local")
.appName("xx")
.enableHiveSupport() // 啟動對hive的支援, 還需新增支援jar包
.getOrCreate()
要新增spark對hive的相容jar包
<!--sparksql對hive的支援-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
在本地執行,還需把hive-site.xml檔案拷貝到resource目錄下。
resources目錄,存放著當前專案的配置檔案
編寫程式碼,local模式下測試:
// 執行查詢
val query = session.sql("select * from t_access_times")
query.show()
// 釋放資源
session.close()
建立表的時候,需要偽裝客戶端身份
System.setProperty("HADOOP_USER_NAME", "root") // 偽裝客戶端的使用者身份為root
// 或者新增執行引數 –DHADOOP_USER_NAME=root
基本操作
// 求每個使用者的每月總金額
// session.sql("select username,month,sum(salary) as salary from t_access_times group by username,month")
// 建立表
// session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")
// 刪除表
// session.sql("drop table t_access1")
// 插入資料
// session.sql("insert into t_access1 select * from t_access_times")
// .show()
// 覆蓋寫資料
// session.sql("insert overwrite table t_access1 select * from t_access_times where username='A'")
// 覆蓋load新資料
// C,2015-01,10
// C,2015-01,20
// session.sql("load data local inpath 't_access_time_log' overwrite into table t_access1")
// 清空資料
// session.sql("truncate table t_access1")
// .show()
// 寫入自定義資料
val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))
val accessdf = access.map({
t =>
val lines = t.split(",")
(lines(0), lines(1), lines(2).toInt)
}).toDF("username", "month", "salary")
// .show()
accessdf.createTempView("t_ac")
// session.sql("insert into t_access1 select * from t_ac")
// overwrite模式會重新建立新的表 根據指定schema資訊 SaveMode.Overwrite
// 本地模式只支援 overwrite,必須在sparksession上新增配置引數:
// .config("spark.sql.warehouse.dir", "hdfs://hdp-01:9000/user/hive/warehouse")
accessdf
.write.mode("overwrite").saveAsTable("t_access1")
叢集執行:
需要把hive-site.xml配置檔案,新增到$SPARK_HOME/conf目錄中去,重啟spark
上傳一個mysql連線驅動(sparkSubmit也要連線MySQL,獲取元資料資訊)
spark-sql --master spark://hdp-01:7077 --driver-class-path /root/mysql-connector-java-5.1.38.jar
--class xx.jar
然後執行程式碼的編寫:
// 執行查詢 hive的資料表
// session.sql("select * from t_access_times")
// .show()
// 建立表
// session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")
// session.sql("insert into t_access1 select * from t_access_times")
// .show()
// 寫資料
val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))
val accessdf = access.map({
t =>
val lines = t.split(",")
(lines(0), lines(1), lines(2).toInt)
}).toDF("username", "month", "salary")
accessdf.createTempView("v_tmp")
// 插入資料
// session.sql("insert overwrite table t_access1 select * from v_tmp")
session.sql("insert into t_access1 select * from v_tmp")
// .show()
// insertInto的api 入庫
accessdf.write.insertInto("databaseName.tableName")
session.close()