1. 程式人生 > 其它 >Hive_語法_連續N天登陸

Hive_語法_連續N天登陸

技術標籤:大資料hivesql大資料

文章目錄

Sql方式實現連續N天登陸

構造測試資料

create table dwd.login_log as
select 1 as user_id, "2020-01-01" as login_date
union all
select 1 as user_id, "2020-01-02" as login_date
union all
select 1 as user_id, "2020-01-07"
as login_date union all select 1 as user_id, "2020-01-08" as login_date union all select 1 as user_id, "2020-01-09" as login_date union all select 1 as user_id, "2020-01-10" as login_date union all select 2 as user_id, "2020-01-01" as login_date union all select 2 as user_id,
"2020-01-02" as login_date union all select 2 as user_id, "2020-01-04" as login_date

如果日期格式不規範,可以將其轉換為標準格式

create table dwd.login_log as
select user_id,to_date(from_unixtime(UNIX_TIMESTAMP(login_date,'yyyy-MM-dd'))) as login_date
from tmp.login_log; -- tmp庫為原始資料

1.使用lag&lead+datediff視窗函式

  • 比如求連續三天登陸,可以將當天上一條資料和下一條資料都拿到,然後保證now-lag=lead-now=1即可;
  • 如果是連續多天,可以取更多的資料,或者將資料全部更改為lag或者lead函式;
  • datediff(date1, date2) - Returns the number of days between date1 and date2
select user_id 
from 
  (select user_id
  from
      (select user_id,
            lag(login_date,1) over(partition by user_id order by login_date) as lag_login_date,
            login_date,
            lead(login_date,1) over(partition by user_id order by login_date) as lead_login_date
      from dwd.login_log)t1
  where datediff(login_date,lag_login_date)=1 and datediff(lead_login_date,login_date)=1)t2
group by user_id;

2.使用date_add函式

  • 通用的,先對user_id分割槽排序,然後將日期減去rank天,檢視有多少條資料即可;
  • 優點在於可以統計具體連續登陸多少天,以及連續登陸的實際情況;
  • date_add(start_date, num_days) - Returns the date that is num_days after start_date
select user_id,con_login_date,count(*) nums
from
    (select user_id,login_date,rk,date_add(login_date,1 - rk) as con_login_date
    from 
        (select user_id,login_date,rank() over(partition by user_id order by login_date) rk
        from dwd.login_log)t1
    )t2
group by user_id,con_login_date
having count(*) >= 3;
  • t1表的查詢結果
使用者id登陸時間按照登陸時間組內排序
12020-01-011
12020-01-022
12020-01-073
12020-01-084
12020-01-095
12020-01-106
22020-01-011
22020-01-022
22020-01-043
  • t2表的查詢結果,歸一化的日期(也就是上述取前1 - rk)可以自己定義
使用者id登陸時間連續登陸的日期歸一化的日期
12020-01-012020-01-01
12020-01-022020-01-01
12020-01-072020-01-05
12020-01-082020-01-05
12020-01-092020-01-05
12020-01-102020-01-05
22020-01-012020-01-01
22020-01-022020-01-01
22020-01-042020-01-02
  • group by後的查詢結果,第三列可以按照session內統計來理解,就是這批連續登陸內連續登陸的天數
使用者id連續登陸的日期歸一化的日期使用者此次連續登陸天數
12020-01-012
12020-01-054
22020-01-012
22020-01-021

程式碼實現思路

  • 使用程式碼來實現連續N天登陸,核心邏輯就是按照日期排序,新日期如果和舊日期相差1天就保留在HashMap裡面,Size超過N即可輸出user_id,否則清空
package cn.lang.spark_core

import java.text.{ParseException, SimpleDateFormat}
import java.util.Calendar

import org.apache.spark.sql.SparkSession

object ContinuousLoginDays {
  def main(args: Array[String]): Unit = {
    // env
    val spark: SparkSession = SparkSession
      .builder()
      .appName("ContinuousLoginDays")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    // source,可以是load hive(開啟hive支援)或者parquet列式檔案(定義好schema)
    val source = sc.textFile("/user/hive/warehouse/dwd/login_log")

    case class Login(uid: Int, loginTime: String) // 可以kryo序列化

    /** get date last `abs(n)` days defore or after biz_date   *
     * example biz_date = 20200101 ,last_n = 1,return 20191231 */
    def getLastNDate(biz_date: String,
                     date_format: String = "yyyyMMdd",
                     last_n: Int = 1): String = {
      val calendar: Calendar = Calendar.getInstance()
      val sdf = new SimpleDateFormat(date_format)
      try
        calendar.setTime(sdf.parse(biz_date))
      catch {
        case e: ParseException => // omit
      }
      calendar.set(Calendar.DATE, calendar.get(Calendar.DATE) - last_n)
      sdf.format(calendar.getTime)
    }

    // transform
    val result = source
      .map(_.split("\t"))
      .map(iterm => Login(iterm(0).toInt, iterm(1)))
      .groupBy(_.uid) // RDD[(Int, Iterable[Login])]
      .map(iterm => {
        // 用於給此uid標記是否符合要求
        var CONTINUOUS_LOGIN_N = false

        val logins = iterm._2
          .toSeq
          .sortWith((v1, v2) => v1.loginTime.compareTo(v2.loginTime) > 0)

        var lastLoginTime: String = ""
        var loginDays: Int = 0

        logins
          .foreach(iterm => {
            if (lastLoginTime == "") {
              lastLoginTime = iterm.loginTime
              loginDays = 1
            } else if (getLastNDate(iterm.loginTime) == lastLoginTime) {
              lastLoginTime = iterm.loginTime
              loginDays = 2
            } else {
              lastLoginTime = iterm.loginTime
              loginDays = 1
            }
          })

        if (loginDays > 3) CONTINUOUS_LOGIN_N = true

        /** 此處可以使用集合將連續登陸的情況保留,
         * 也可以直接按照是否連續登陸N天進行標記
         */
        (iterm._1, CONTINUOUS_LOGIN_N)
      })
      .filter(_._2)
      .map(_._1)
    // sink
    result.foreach(println(_))
  }
}