dataframe操作hive資料倉庫【Java純程式碼】
package com.bjsxt;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;
public class CreateDFFromHiveLocalTest {
public static void main(String[] args) {
SparkConf conf =new SparkConf().setAppName("hive").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);
//SQLContext sqlContext=new SQLContext(sc);
//HiveContext是SQLContext的子類
HiveContext hiveContext=new HiveContext(sc);
hiveContext.sql("USE spark");
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
//在hive中建立表student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath './student_infos' into table student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING,score INT) row format delimited fields terminated by '\t'");
hiveContext.sql(
"LOAD DATA "
+ "LOCAL INPATH './student_scores'"
+ "INTO TABLE student_scores"
);
/**
* 生成查詢表
*/
DataFrame df = hiveContext.table("student_infos");//第二種讀取hive表載入DF的方式
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name,si.age,ss.score "
+ "FROM student_infos si "
+ "JOIN student_scores ss "
+ "ON si.name=ss.name "
+ "WHERE ss.score>=80"
);
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.registerTempTable("goodstudent");
DataFrame result = hiveContext.sql("select * from goodstudent");
result.show();
/**
* 將結果儲存在hive表 good_studeng_infos
*/
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
Row[] goodStudentsRows = hiveContext.table("good_student_infos").collect();
for(Row goodStudentsRow: goodStudentsRows ) {
System.out.println(goodStudentsRow);
}
sc.stop();
}
}