第67課:Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)
阿新 • • 發佈:2018-12-20
內容:
1.SparkSQL案例分析 2.SparkSQL下采用Java和Scala實現案例
一、SparkSQL下采用Java和Scala實現案例
學生成績:
{"name":"Michael","score":98}
{"name":"Andy", "score":95}
{"name":"Justin", "score":91}
1.Spark SQL下采用Java實現Join的案例
package SparkSQL; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.ArrayList; import java.util.List; /** * FileName: SparkSQLWithJoin * Author: hadoop * Email:
[email protected] * Date: 18-11-8 上午10:48 * Description: */ public class SparkSQLWithJoin { public static void main(String[] args){ //建立SparkConf用於讀取系統資訊並設定運用程式的名稱 SparkConf conf = new SparkConf().setAppName("SparkSQLWithJoin").setMaster("local"); //建立JavaSparkContext物件例項作為整個Driver的核心基石 JavaSparkContext sc = new JavaSparkContext(conf); //設定輸出log的等級 sc.setLogLevel("ERROR"); //建立SQLContext上下文物件,用於SqL的分析 SQLContext sqlContext = new SQLContext(sc); //建立dataFrame,可以簡單的認為DataFrame是一張表 Dataset peopleDS = sqlContext.read().json("file:///home/hadoop/people.json"); //基於JSON構建的DataFrame來註冊的臨時表 peopleDS.registerTempTable("peopleScore"); //查詢出分數大於90的人 Dataset excellentScore = sqlContext.sql("select name ,score from peopleScore where score >= 90"); /** * 在Dataframe的基礎上轉化為RDD,通過map操作計算出分數大於90的所有姓名 */ List<String> excellentScoreNameList = excellentScore.javaRDD().map(new Function<Row ,String>() { @Override public String call(Row row) throws Exception { return row.getAs("name"); } }).collect(); //動態組拼出JSON List<String> peopleInformations = new ArrayList<String>(); peopleInformations.add("{\"name\":\"Michael\",\"age\":20}"); peopleInformations.add("{\"name\":\"Andy\", \"age\":30}"); peopleInformations.add("{\"name\":\"Justin\", \"age\":19}"); //通過內容為JSON的rdd來構造dataframe JavaRDD<String> peopleInformationRDD = sc.parallelize(peopleInformations); Dataset peopleInformationDS = sqlContext.read().json(peopleInformationRDD); //註冊為臨時表 peopleInformationDS.registerTempTable("peopleInformations"); //查詢成績優秀的人的姓名和年齡的sql語句 String sqlText = "select name,age from peopleInformations where name in ("; for(int i = 0;i < excellentScoreNameList.size();i++){ sqlText+="'"+ excellentScoreNameList.get(i)+"'"; if (i < excellentScoreNameList.size()-1){ sqlText+=","; } } sqlText +=")"; //執行sql語句得到一個Dataset Dataset excellentNameAgeDS = sqlContext.sql(sqlText); //將成績優秀的人的成績和年齡進行jion操作 JavaPairRDD<String,Tuple2<Integer,Integer>> resultRDD = excellentScore.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String,Integer>(row.getAs("name"),(int)row.getLong(1)); } }).join(excellentNameAgeDS.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String,Integer>(row.getAs("name"),(int)row.getLong(1)); } })); //呼叫RowFactory工廠方法生成記錄 JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1); } }); /** * 動態構造DataFrame的元資料,一般而言,有多少列以及每列的具體型別可能來自於json檔案,也可能來自於資料庫 */ List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true)); //構建StructType,用於最後DataFrame元資料的描述 StructType structType = DataTypes.createStructType(structFields); //生成Dataset Dataset personDS = sqlContext.createDataFrame(reusltRowRDD,structType); personDS.show(); personDS.write().format("json").save("file:///home/hadoop/peopleResult.json"); sc.close(); } }
2.SparkSQL下采用Java和Scala實現案例
package SparkSQL import org.apache.spark.sql.{RowFactory, SQLContext} import org.apache.spark.{SparkConf, SparkContext} ; /** * FileName: SparkSQLWithJionScala * Author: hadoop * Email: [email protected] * Date: 18-11-8 下午5:06 * Description: */ object SparkSQLWithJionScala { def main(args: Array[String]): Unit = { //建立SparkConf用於讀取系統資訊並設定運用程式的名稱 val conf = new SparkConf().setAppName("SparkSQLWithJionScala").setMaster("local") //建立JavaSparkContext物件例項作為整個Driver的核心基石 val sc = new SparkContext(conf) //方便檢視輸出的日誌資訊,也可以設定為WARN、ERROR sc.setLogLevel("ERROR") //建立SQLContext上下文物件,用於SqL的分析 val sqlContext = new SQLContext(sc) //建立dataFrame,可以簡單的認為DataFrame是一張表 val personScoreDS = sqlContext.read.json("file:///home/hadoop/score.json") //將成績表註冊為臨時表 personScoreDS.registerTempTable("personScores") //選擇出成績優秀的成績記錄 val excellentStudentsDS = sqlContext.sql("select name ,score from personScores where score >= 90") //取出成績優秀的學生姓名 val excellentStudentName = excellentStudentsDS.rdd.map(_(0)) //學生的資訊姓名,年齡 val peopleInformations = Array( "{\"name\":\"Michael\", \"age\":20}", "{\"name\":\"Andy\", \"age\":17}", "{\"name\":\"Justin\", \"age\":19}" ) //將學生的資訊姓名,年齡註冊為RDD val peopleInformationRDD = sc.parallelize(peopleInformations) //將學生資訊註冊為JSON格式 val peopleInformationDS = sqlContext.read.json(peopleInformationRDD) //將學生的資訊註冊為臨時表 peopleInformationDS.registerTempTable("peopleInformations") /** * 查詢成績為優秀的學生的資訊 */ val sqlText = new StringBuilder() sqlText.append("select name,age from peopleInformations where name in (") val students:Array[Any] = excellentStudentName.collect() for (i <- 0 until students.size){ sqlText.append("'" + students(i).toString + "'") if (i != students.size-1) sqlText.append(",") } sqlText.append(")") val sqlString = sqlText.toString() val excellentStudentNameAgeDS = sqlContext.sql(sqlString) //將學生的成績表和資訊表進行jion操作 val resultRDD = excellentStudentsDS.rdd.map(row=>(row.getAs("name").toString,row.getLong(1))).join(excellentStudentNameAgeDS.rdd.map(line=>(line.getAs("name").toString,line.getLong(1)))) /** * 將jion後的資訊進行整理 */ val resultRowRDD = resultRDD.map(tuple=>{ val name = tuple._1 val age:java.lang.Integer=tuple._2._2.toInt val score:java.lang.Integer= tuple._2._1.toInt RowFactory.create(name, age, score) }) //生成dataFrame val personDS = sqlContext.createDataFrame(resultRowRDD.map(row => PersonAgeScore(row.getString(0),row.getInt(1),row.getInt(2)))) personDS.show() personDS.write.json("file:///home/hadoop/json") } } case class PersonAgeScore(name: String, age: Int, score: Int)