Spark專案之 sparkDemo 七 SparkDemo解析
阿新 • • 發佈:2018-12-25
下面我們在來看,我們把各個查詢資料方法分開進行,然後把資料整合起來,readShadowSocks方法我們用來讀取登陸資料
readHistory讀取註冊資料,retainAddData方法進行資料分析整合,outJDBCData方法將留存資料寫進mysql中
package com.shadowsocks import org.apache.spark.sql.SparkSession import com.util.DateUtil import org.apache.spark.sql.functions._ import org.apache.spark.sql.{ DataFrame, Dataset, Row, SparkSession } import org.apache.spark.sql.functions.{ when, _ } import java.util.Properties object ReadTableCommon { /** * 讀取當日登陸使用者數 */ def readShadowSocks(spark: SparkSession)(logTable: String, targetDay: String) = { import spark.implicits._ spark.read.table(logTable).where($"login_time" === targetDay) .select($"login_time", $"ip_send") .distinct() } /** * 讀取留存資料 */ def readHistory(spark: SparkSession)(registerTable: String, targetDay: String) = { val (seven_day, fourteen_day, thirty_day) = (DateUtil.getDateByDay(targetDay, -7), DateUtil.getDateByDay(targetDay, -14), DateUtil.getDateByDay(targetDay, -30)) import spark.implicits._ spark.read.table(registerTable) .filter(($"register_time" <= targetDay && $"register_time" >= seven_day) || $"register_time" === fourteen_day || $"register_time" === thirty_day) .select($"id", $"ip_send", $"ip_end", $"register_time") .distinct } /** * 留存統計公共模組 */ def retainAddData(spark: SparkSession)(targetDay: String, regTableDF: DataFrame, logTableDF: DataFrame) = { import spark.implicits._ regTableDF.join(logTableDF, Seq("ip_send")) .groupBy("register_time").agg(count("ip_send").as("retained_person_num")) .withColumn("target_day", when($"retained_person_num".isNotNull, targetDay)) .select($"target_day", $"register_time".as("retained_day"), datediff($"target_day", $"register_time").as("retained_day_num"), $"retained_person_num") } /** * 將DataFrame的資料輸入到mysql資料庫 */ def outJDBCData(spark: SparkSession)(url: String, regTableDF: DataFrame, tableName: String) = { // val url = "jdbc:mysql://192.168.131.155:3306/hadoop?characterEncoding=UTF-8" val connectionProperties = new Properties() connectionProperties.setProperty("user", "root");// 設定使用者名稱 connectionProperties.setProperty("password", "root");// 設定密碼 // regTableDF.write.jdbc(url, "shadowsocks_retain", connectionProperties)//新建資料庫表,並新增資料(資料庫中必須沒有資料表格) regTableDF.write.mode("append").jdbc(url, tableName, connectionProperties)//在資料表中追加資料 } }
package com.shadowsocks import java.util.Properties import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.DateType import org.apache.spark.sql.types.TimestampType object Shadowsocks { def main(args: Array[String]): Unit = { val Array(targetDay, registerTable, logTable, url) = args val spark = SparkSession.builder().appName("shadowsocks").enableHiveSupport().getOrCreate() val regTableDF = ReadTableCommon.readShadowSocks(spark)(logTable, targetDay) val logTableDF = ReadTableCommon.readHistory(spark)(registerTable, targetDay) val retainDF = ReadTableCommon.retainAddData(spark)(targetDay, regTableDF, logTableDF) retainDF.schema .add("id",LongType,false) .add("target_day",DateType,true) .add("retained_day",DateType,true) .add("retained_day_num",IntegerType,true) .add("retained_person_num",IntegerType,true) .add("create_time",TimestampType,false) ReadTableCommon.outJDBCData(spark)(url, retainDF, "shadowsocks_retain") spark.close() } }