1. 程式人生 > 實用技巧 >spark案例實戰

spark案例實戰

1.資料匯入hive表

匯入opt目錄下使用者收視資訊表.csv到hive的UserViewInfo表

建表sql命令 **注意如果是分號隔開,不能直接輸入,要用轉義字元** ``` create table UserViewInfo( TVID bigint, count_date string, TVNum int, TVName string, TVBeginTime double, TVEndTime double) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserViewInfo.csv' overwrite into table UserViewInfo; ``` ## 匯入opt目錄下使用者資訊表.csv到hive的UserInfo表 **注意:日期型別資料匯入有問題,所以日期這裡都處理為string,後續用程式碼分離** ``` create table UserInfo( UserID bigint, TVBrand string, UserStatus string, ChangeTime bigint, PreMoney int, Package string, XiaoShouPin string, ZiFei string, BeginTime string, EffetiveTime string, InvalidTime string, TVID bigint) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserInfo.csv' overwrite into table UserInfo; ```

2.統計UserViewInfo表中觀看時間超過半個小時的使用者

建立工程,新建Object類

scala程式碼,本程式學習如何自定義udf函式建新列
package spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
object UserViewInfo {
  //機頂盒裝置號,統計日期,頻道號,頻道名,收看開始時間,收看結束時間
  //10001,20170831,1,中央1臺-高清,42978.85,42978.85139
  //case class UserView{TVID:String;CountDate:Long;TVNum:Int;TVName:String;
   // TVBeginTime:Double;TVEndTime:Double}
  def main(args: Array[String]): Unit = {
   val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
   val df_ViewData=spark.read.table("tv.userviewinfo")
     def getTime(beginTimeStr:String,endTimeStr:String):Int={
      val bstr:Array[String]=beginTimeStr.split(":")
       val estr:Array[String]=endTimeStr.split(":")
       val time:Int=estr(0).toInt*60+estr(1).toInt-(bstr(0).toInt*60+bstr(1).toInt)
       return time
    }
    val getLabel_udf=udf((x:String,y:String)=>getTime(x,y))
    val result=df_ViewData.withColumn("WatchTime",getLabel_udf(col("TVBeginTime"),col("TVEndTime")))
      .filter("WatchTime>30")
    result.show(10)
    println(result.count())
    result.write.mode("overwrite").saveAsTable("tv.test1")
  }
}

除錯程式碰到問題

  • spark-submit一直出錯,經過很久檢查,發現是hive沒有啟動。(spark啟動後一定要jps檢查是否有RUNJAR程序)。同時java環境程式碼務必加入val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
  • 虛擬機器記憶體優秀,如果在xshell客戶端開了spark-shell,相當於啟動一個spark應用程序,一定要在master:8080頁面kill掉,否則一直提示任務分配失敗
  • 叢集提交命令spark-submit --master spark://master:7077 --class spark.UserViewInfo /opt/word.jar