1. 程式人生 > >Spark專案之 sparkDemo 六 SparkSession資料讀取測試

Spark專案之 sparkDemo 六 SparkSession資料讀取測試

專案介紹,這個專案主要做一個註冊歷史統計留存。我這裡不用sparkCore的方式來做(我覺得sparkCore的方式比較適合做資料整理),因為我覺得這種方式不是特別方便我這裡就直接使用 DataFrame,當然最直接的方式就是看官方的文件:

官方地址:http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame

推薦兩篇參考部落格:

https://blog.csdn.net/sparkexpert/article/details/51042970

https://blog.csdn.net/dabokele/article/details/52802150

第一步,我們理解一下需求,我們要根據訪問日誌表來分析出今日訪問的使用者是哪天註冊的,然後再看留存的天數,所以我們要把今日訪問的日誌資料進行篩選和去重,然後和登錄檔的資料進行一個join,然後篩選出留存天數的使用者數量。下面我就進入程式碼。

我們先一個最簡單的方法:第一步,我們先把資料匯入hdfs或者是hive,用sqoop匯入請參考https://blog.csdn.net/lx1309244704/article/details/83892399,資料匯入成功後我們就可以獲取資料了,我在安裝spark說過,我沒有把hadoop的環境配置進去,那麼我們訪問hdfs的資料就需要加入他的全地址例如:hdfs://192.168.131.155:9000/input/ceshi.txt。

下面我進行一個測試test。

import org.apache.spark.sql.SparkSession


object test {
  def main(args: Array[String]): Unit = {
    val targetDay = "2018-10-20"
    
    val spark = SparkSession.builder().appName("shadowsocks").enableHiveSupport().getOrCreate()//讀取hive的資料
    import spark.implicits._
    val regTableDF = spark.read.table("shadowsocks_log").where($"login_time" === targetDay).select($"id", $"ip_send", $"login_time").distinct
    regTableDF.show()
    spark.close()
  }
}

我們寫一句就打個包然後就拿到環境上去執行,效率是不是會很慢。我們有兩種方式來測試,

第一種,把資料考到本地,執行這個scala就行了。

第二種:我們就用spark 給我提供的spark-shell來執行。

我這裡主要將spark-shell的方式,第一步我們需要先啟動Hadoop和spark,然後啟動spark-shell

./spark-shell --driver-class-path /home/hive/lib/mysql-connector-java-5.1.44.jar

然後一行一行的輸入,import的架包也需要輸入,如果你覺得太長,一行寫不完想換行,那麼請用 :paste形式,然後用ctrl +D的方式退出

 結果如下圖:

 我們查詢到了我們想要的資料。

專案demo:https://github.com/LX1309244704/sparkDemo