1. 程式人生 > >spark SQL學習(案例-統計每日銷售)

spark SQL學習(案例-統計每日銷售)

 

需求:統計每日銷售額


package wujiadong_sparkSQL


import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

/**
  * Created by Administrator on 2017/3/6.
  */
object DailySale {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("dailysale").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //模擬資料
    val userSalelog = Array(
      "2017-02-01,55,1122",
      "2017-02-01,23,1133",
      "2017-02-01,15,",
      "2017-02-02,56,1155",
      "2017-02-01,78,1123",
      "2017-02-01,113,1144"
    )

    val userSalelogRDD = sc.parallelize(userSalelog,2)
    val filteredUserRDD = userSalelogRDD.filter(log => if(log.split(",").length == 3) true else false)
    val RowRDD = filteredUserRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toInt,log.split(",")(2).toInt))
    val schema = StructType(
      Array(
        StructField("date",StringType,true),
        StructField("sale_amount",IntegerType,true),
        StructField("userid",IntegerType,true)
      )
    )

    val df = sqlContext.createDataFrame(RowRDD,schema)

    df.groupBy("date")
      .agg('date,sum('sale_amount))
      .map(row => Row(Row(row(0),row(2))))
      .collect()
      .foreach(println)


  }

}

執行結果


[email protected]:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.DailySale  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
17/03/06 20:55:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/06 20:55:21 WARN SparkConf: 
SPARK_CLASSPATH was detected (set to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath
        
17/03/06 20:55:21 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar' as a work-around.
17/03/06 20:55:21 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar' as a work-around.
17/03/06 20:55:23 INFO Slf4jLogger: Slf4jLogger started
17/03/06 20:55:23 INFO Remoting: Starting remoting
17/03/06 20:55:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://
[email protected]
:58765] 17/03/06 20:55:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 17/03/06 20:55:26 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. [[2017-02-01,269]] [[2017-02-02,56]] 17/03/06 20:55:51 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 17/03/06 20:55:51 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

分類: Spark

好文要頂 關注我 收藏該文  

鄔家棟
關注 - 10
粉絲 - 7

+加關注

0

0

« 上一篇:spark SQL學習(案例-統計每日uv)
» 下一篇:spark SQL學習(認識spark SQL)