Spark專案之 sparkDemo 六 SparkSession資料讀取測試
專案介紹,這個專案主要做一個註冊歷史統計留存。我這裡不用sparkCore的方式來做(我覺得sparkCore的方式比較適合做資料整理),因為我覺得這種方式不是特別方便我這裡就直接使用 DataFrame,當然最直接的方式就是看官方的文件:
官方地址:http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame
推薦兩篇參考部落格:
https://blog.csdn.net/sparkexpert/article/details/51042970
https://blog.csdn.net/dabokele/article/details/52802150
第一步,我們理解一下需求,我們要根據訪問日誌表來分析出今日訪問的使用者是哪天註冊的,然後再看留存的天數,所以我們要把今日訪問的日誌資料進行篩選和去重,然後和登錄檔的資料進行一個join,然後篩選出留存天數的使用者數量。下面我就進入程式碼。
我們先一個最簡單的方法:第一步,我們先把資料匯入hdfs或者是hive,用sqoop匯入請參考https://blog.csdn.net/lx1309244704/article/details/83892399,資料匯入成功後我們就可以獲取資料了,我在安裝spark說過,我沒有把hadoop的環境配置進去,那麼我們訪問hdfs的資料就需要加入他的全地址例如:hdfs://192.168.131.155:9000/input/ceshi.txt。
下面我進行一個測試test。
import org.apache.spark.sql.SparkSession object test { def main(args: Array[String]): Unit = { val targetDay = "2018-10-20" val spark = SparkSession.builder().appName("shadowsocks").enableHiveSupport().getOrCreate()//讀取hive的資料 import spark.implicits._ val regTableDF = spark.read.table("shadowsocks_log").where($"login_time" === targetDay).select($"id", $"ip_send", $"login_time").distinct regTableDF.show() spark.close() } }
我們寫一句就打個包然後就拿到環境上去執行,效率是不是會很慢。我們有兩種方式來測試,
第一種,把資料考到本地,執行這個scala就行了。
第二種:我們就用spark 給我提供的spark-shell來執行。
我這裡主要將spark-shell的方式,第一步我們需要先啟動Hadoop和spark,然後啟動spark-shell
./spark-shell --driver-class-path /home/hive/lib/mysql-connector-java-5.1.44.jar
然後一行一行的輸入,import的架包也需要輸入,如果你覺得太長,一行寫不完想換行,那麼請用 :paste形式,然後用ctrl +D的方式退出
結果如下圖:
我們查詢到了我們想要的資料。