Spark SQL 運算元例項
阿新 • • 發佈:2019-01-04
package sqlText import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types.{StructType, IntegerType, StringType, StructField} /** * Created by xiaoxu */ object SparkSQLAgg { def main(args: Array[String]) { System.setProperty("hadoop.home.dir", "E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4") val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val userData = Array( "2016-04-15,1001,http://spark.apache.org,1000", "2016-04-15,1001,http://hadoop.apache.org,1001", "2016-04-15,1002,http://fink.apache.org,1002", "2016-04-16,1003,http://kafka.apache.org,1020", "2016-04-16,1004,http://spark.apache.org,1010", "2016-04-16,1002,http://hive.apache.org,1200", "2016-04-16,1001,http://parquet.apache.org,1500", "2016-04-16,1001,http://spark.apache.org,1800" ) import org.apache.spark.sql._ val parallelize: RDD[String] = sc.parallelize(userData) val userDateRDDRow = parallelize.map(row => { val splitted = row.split(",") Row(splitted(0).replaceAll("-", ""), splitted(1).toInt, splitted(2), splitted(3).toInt) }) // 構造欄位,與資料匹配,便於今後查詢 val structTypes = StructType(Array( StructField("date", StringType, true), StructField("id", IntegerType, true), StructField("url", StringType, true), StructField("amount", IntegerType, true) )) val createDataFrame = sqlContext.createDataFrame(userDateRDDRow, structTypes) //統計每個月的數量,直接顯示 createDataFrame.groupBy("date").agg("amount" -> "sum").write.json("") // 統計每個月的數量,直接顯示,資料量比較大時不能用collect,用write.json("")方法直接儲存資料即可 createDataFrame.groupBy("date").agg("amount" -> "sum").map(row => Row(row(0), row(1))).collect.foreach(println)
// 停止改程式
sc.stop()
}}