3.sparkSQL整合Hive
spark SQL經常需要訪問Hive metastore,Spark SQL可以通過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。註意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操作(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6618841.html
Spark SQL和hive共用一套元數據庫
Spark SQL自己也可創建元數據庫,並不一定要依賴hive創建元數據庫,所以不需要一定啟動hive,只要有元數據庫,Spark SQL就可以使用。但是如果要像hive一樣持久化文件與表的關系就要使用hive,當然可以不啟動hive程序使用spark提供的HiveContext類即可。 1.將hive的hive-site.xml拷貝到放入$SPARK-HOME/conf目錄下,裏面配置的是Hive metastore元數據存放在數據庫的位置,當然如果數據庫不存在,我們可以定義一個數據庫,然後程序在spark集群運行的時候就會自動創建對應的元數據庫。<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://192.168.19.131:3306/hivedb?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hadoop</value> </property> </configuration>2.如果hdfs配置了高可用,則還要把hadoop集群中的hdfs-site.xml和core-site.xml文件拷貝到spark/conf文件夾下面。 3.啟動spark-shell時指定mysql連接驅動位置 spark集群模式
bin/spark-shell --master spark://intsmaze:7077 --executor-memory 512m --total-executor-cores 2 --driver-class-path /home/intsmaze/mysql-connector-java-5.1.35-bin.jar
sprk on yarn模式
bin/spark-shell --master yarn --executor-memory 512m --total-executor-cores 2 --driver-class-path /home/intsmaze/mysql-connector-java-5.1.35-bin.jar
4.執行sql語句
使用sqlContext.sql調用HQL
val rdd=sqlContext.sql("select * from default.person limit 2")//現在就可以直接使用sql語句了,只是要指定查詢哪個庫的哪張表。
rdd.write.json("hdfs://192.168.19.131:9000/personresult")
使用org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("select * from default.person ")
5.使用sprk-sql命令啟動shell模式
啟動spark-sql時指定mysql連接驅動位置(啟動spark-sql那麽就和hive的操作一樣,裏面可以直接寫sql語句進行操作)
bin/spark-sql--master spark://intsmaze:7077 --executor-memory 512m --total-executor-cores 3 --driver-class-path /home/intsmaze/mysql-connector-java-5.1.35-bin.jar
裏面直接寫sql語句。
select * from default.person limit 2
spark sql如何向元數據中添加數據?因為元數據庫中只是存放表對應數據在hdfs的地址,並沒有存放表的數據信息,spark sql可以創建表,但是無法向表中添加數據比如insert語句。註意與把DF數據存儲到數據庫不是一個概念。
6.Thrift JDBC/ODBC server
Spark SQL實現Thrift JDBC/ODBC server,這就意味著我們可以像HIVE那樣通過JDBC遠程連接Spark SQL發送SQL語句並執行。在這之前需要先將${HIVE_HOME}/conf/hive-site.xml 拷貝到${SPARK_HOME}/conf目錄下,由於我的hive配置了元數據信息存儲在MySQL中,所以Spark在訪問這些元數據信息時需要mysql連接驅動的支持。
添加驅動的方式有三種:
第一種是在${SPARK_HOME}/conf目錄下的spark-defaults.conf中添加:spark.jars /intsmaze/lib/mysql-connector-java-5.1.26-bin.jar。
第二種是通過添加 :spark.driver.extraClassPath /intsmaze/lib2/mysql-connector-java-5.1.26-bin.jar這種方式也可以實現添加多個依賴jar,比較方便。
第三種是在運行時添加 --jars /intsmaze/lib2/mysql-connector-java-5.1.26-bin.jar。
啟動thrift
在spark根目錄下執行:./sbin/start-thriftserver.sh 開啟thrift服務器。
./start-thriftserver.sh --jars /home/hadoop/mysql-connector-java-5.1.35-bin.jar --master yarn
start-thriftserver.sh 和spark-submit的用法類似,可以接受所有spark-submit的參數,並且還可以接受--hiveconf 參數。不添加任何參數表示以local方式運行,默認的監聽端口為10000
用beeline測試
在spark根目錄下執行: ./bin/beeline連接 JDBC/ODBC server beeline> !connect jdbc:hive2://localhost:10000
連接後會提示輸入用戶名和密碼,用戶名可以填當前登陸的linux用戶名,密碼為空即可。
在java代碼中用jdbc連接
接下來打開eclipse用jdbc連接hiveserver2,連接hive的步驟同樣如此。 在pom.xml添加以下依賴: <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
驅動:org.apache.hive.jdbc.HiveDriver
url:jdbc:hive2://192.168.19.131:10000/default
用戶名:hadoop (啟動thriftserver的linux用戶名)
密碼:“”(默認密碼為空)
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class Test1 {
public static void main(String[] args) throws SQLException {
String url = "jdbc:hive2://192.168.19.131:10000/default";
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Connection conn = DriverManager.getConnection(url,"hadoop","");
Statement stmt = conn.createStatement();
String sql = "SELECT * FROM personlimit 10";
ResultSet res = stmt.executeQuery(sql);
while(res.next()){
System.out.println("id: "+res.getInt(1)+"\tname: "+res.getString(2)+"\tage:" + res.getInt(3));
}
}
}
這種方式,可以在yarn的管理界面看到,會長起一個任務,該任務負責跑sql語句,但是不能並行跑sql語句,就是同時為兩個用戶輸入的查詢語句同時跑,必須等一個跑完了再跑第二個。
spark sql可視化
第一種方案:
將spark sql代碼打包,sql語句和結果存儲位置作為參數,java代碼收集這些參數後,組裝為命令,調用腳本來向集群提交jar包。
第二種方案:
根據Spark官網所述,Spark SQL實現了Thrift JDBC/ODBC server
最後,這篇文章很久了,一直編輯沒有發布,我現在已經一年不搞spark了,專註java核心技術的研究。
3.sparkSQL整合Hive