1. 程式人生 > 實用技巧 >spark之Markov馬爾可夫智慧郵件預測

spark之Markov馬爾可夫智慧郵件預測

一.來源

  此專案來源《Data Algorithms Recipes for Scaling Up with Hadoop and Spark》第11章,本程式利用spark3.0以及java8進行改寫,

改寫的有:

  1.利用spark3.0與java8

  2.直接利用spark生成最終的狀態轉移矩陣,不用分開統計

  3.利用python載入狀態轉移矩陣進行預測

二.目的

  使用者的購買行為看起來是沒有規律可循的,但其實從時間有序的角度看,也許是有規律可循的,例如,使用者可能每一個月發工資時購買得多,每年某個時間(雙十一、生日)等購買得比較多
馬爾科夫模型能夠挖掘出時間上的規律,假設我們能夠根據使用者上一次購買記錄推測其下一次購買時間,就可以在推測時間向其傳送郵件進行營銷
至於營銷的商品內容,可以根據其他推薦演算法的結果。

三.程式

完整程式見,Markov馬爾可夫智慧郵件預測(https://github.com/jiangnanboy/spark_tutorial)

1.利用spark統計狀態轉移矩陣

    /**
     * 建立狀態轉移概率矩陣
     * @param session
     */
    public static void buildStateTransitionMatrix(SparkSession session, Broadcast<Map<String, String>> broadcastStatesMap, Broadcast<List<Tuple2<String, List<Double>>>> broadcastInitStateList) {
        
//customerID,transactionID,purchaseDate,amount(顧客ID,交易ID,交易日期,金額) String path = PropertiesReader.get("intermediate_smart_email_txt"); JavaRDD<String> javaRDD = session.read().textFile(path).toJavaRDD().coalesce(10); //key=customerID,v=(purchaseDate,amount) JavaPairRDD<String, Tuple2<Long, Integer>> javaPairRDD = javaRDD.mapToPair(line -> { String[] tokens
= StringUtils.split(line, ","); if(4 != tokens.length) { return null; } long date = DateUtils.parseDate(tokens[2], "yyyy-MM-dd").getTime(); int amount = Integer.parseInt(tokens[3]); Tuple2<Long, Integer> t2 = new Tuple2<>(date, amount); return new Tuple2<>(tokens[0], t2); }); //group by customerID JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = javaPairRDD.groupByKey(); //建立狀態序列 JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(dateAndAmount -> { List<Tuple2<Long, Integer>> list = toList(dateAndAmount); Collections.sort(list, TupleComparatorAscending.INSTANCE);//對list按日期排序 return toStateSequence(list); }); /** * customerID, List<State> * 所有狀態的頻率為1 =》((fromState, toState),1) * | S1 S2 S3 ... *---+----------------------- *S1 | <probability-value> * | *S2 | * | *S3 | * | *...| */ JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> { List<String> states = s._2; List<Tuple2<Tuple2<String, String>, Integer>> mapOut = new ArrayList<>(); if((null == states) || (states.size() < 2)) { return Collections.emptyIterator(); } for(int i = 0; i < (states.size() - 1); i++) { String fromState = states.get(i); String toState = states.get(i+1); Tuple2<String, String> t2 = new Tuple2<>(fromState, toState); mapOut.add(new Tuple2<>(t2, 1)); } return mapOut.iterator(); }); // 統計所有狀態頻率: ((fromState, toState), frequence) JavaPairRDD<Tuple2<String, String>, Integer> fromStateToStateFrequence1 = model.reduceByKey((i1, i2) -> i1 + i2); // ((fromState, toState), frequence) =》 (fromState, (toState, frequence)) JavaPairRDD<String, Tuple2<String, Integer>> fromStateToStateFrequence2 = fromStateToStateFrequence1.mapToPair(s -> { String key = s._1._1; Tuple2<String, Integer> value = new Tuple2<>(s._1._2, s._2); return new Tuple2<>(key, value); }); // group by fromState =》 fromState,List<Tuple2<toState, frequence>> => rowNumber,List<Tuple2<toState, frequence>> JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupState = fromStateToStateFrequence2.groupByKey().mapToPair(st2 -> { String rowNumber = broadcastStatesMap.getValue().get(st2._1); return new Tuple2<>(rowNumber, st2._2); }); //初始化矩陣狀態,value = 1.0 / size //List<Tuple2<String, List<Double>>> initStateList = initState(broadcastStatesMap.getValue().size()); JavaPairRDD<String, List<Double>> initStatePairRDD = JavaSparkContext.fromSparkContext(session.sparkContext()).parallelizePairs(broadcastInitStateList.getValue()); //initStatePairRDD.leftOuterJoin(groupState) JavaPairRDD<String, Tuple2<List<Double>, Optional<Iterable<Tuple2<String, Integer>>>>> joinPairRDD = initStatePairRDD.leftOuterJoin(groupState); //規範化轉移矩陣,使行的概率和為“1” JavaPairRDD<String, List<Double>> resultJavaPairRDD = joinPairRDD.mapValues(lot2 -> { int size = broadcastStatesMap.getValue().size(); List<Double> listDouble = lot2._1; Optional<Iterable<Tuple2<String, Integer>>> option = lot2._2; if(option.isPresent()) { Iterable<Tuple2<String, Integer>> toStateFrequence = option.get(); Iterator<Tuple2<String, Integer>> iter = toStateFrequence.iterator(); List<Tuple2<String, Integer>> iterList = new ArrayList<>(); int sum = 0; while(iter.hasNext()) { Tuple2<String, Integer> t2 = iter.next(); iterList.add(t2); sum += t2._2; } //加入平滑,防止概率為0 if(iterList.size() < size) { sum += size; for(int i = 0; i < listDouble.size(); i ++) { listDouble.set(i, 1.0/sum); } } for(int i = 0; i < iterList.size(); i++) { String stateNumber = broadcastStatesMap.getValue().get(iterList.get(i)._1); double numalizeValue = iterList.get(i)._2 / (double)sum; listDouble.set(Integer.parseInt(stateNumber), numalizeValue); } } else { return listDouble; } return listDouble; }); //1.利用sortByKey對轉移狀態排序,最終的狀態轉移概率矩陣 //List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.sortByKey().collect(); //2.利用takeOrdered對轉移狀態排序,最終的狀態轉移概率矩陣 List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.takeOrdered(broadcastStatesMap.getValue().size(), StateTupleComparatorAscending.INSTANCE); //列印轉移概率矩陣 for(Tuple2<String, List<Double>> s : stateResult) { StringBuilder sb = new StringBuilder(); sb.append(s._1).append(","); for(int i = 0; i < (s._2.size() - 1); i ++) { sb.append(s._2.get(i)).append(" "); } sb.append(s._2.get(s._2.size() - 1)); System.out.println(sb.toString()); } }

2.利用python載入狀態轉移矩陣並進行預測

import os
import time
import datetime

# 根據(spark)Markov.java統計出的馬爾可夫模型(model.txt),對validate.txt中的資料進行預測什麼時間應該發出營銷郵件
user_action = {}
model = []
#9大狀態
states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]

validate_path = os.path.join(os.getcwd(), "validate.txt")
model_path = os.path.join(os.getcwd(), "model.txt")

#讀取validate data
with open(validate_path, 'r', encoding='utf-8') as f_read:
    for line in f_read:
        items = line.strip().split(',')
        user_id = items[0]
        if user_id in user_action.keys():
            hist = user_action[user_id]
            lst = [items[2], items[-1]]
            hist.append(lst)
        else:
            hist = []
            hist.append([items[2], items[-1]])
            user_action[user_id] = hist
print(user_action)

#讀取model data
with open(model_path, 'r', encoding='utf-8') as f_read:
    for line in f_read:
        items = line.strip().split()
        row = []
        for item in items:
            row.append(float(item))
        model.append(row)
print(model)

#根據最近客戶的行為資料(至少兩次交易)make prediciton
for user_id,user_action_list in user_action.items():
    if len(user_action_list) < 2:
        continue
    state_sequence = []
    last_date = ''
    prior = user_action_list[0]
    for i in range(1, len(user_action_list)):
        current = user_action_list[i]
        prior_date = prior[0]
        current_date = current[0]

        #相隔天數
        prior_date = time.strptime(prior_date, '%Y-%m-%d')
        current_date = time.strptime(current_date, '%Y-%m-%d')
        prior_date = datetime.datetime(prior_date[0], prior_date[1], prior_date[2])
        current_date = datetime.datetime(current_date[0], current_date[1], current_date[2])
        days_diff = (current_date - prior_date).days

        dd = 'L'
        if days_diff < 30:
            dd = 'S'
        elif days_diff < 60:
            dd = 'M'

        #相差金額
        prior_amount = int(prior[1])
        current_amount = int(current[1])

        ad = 'G'
        if prior_amount < 0.9 * current_amount:
            ad = 'L'
        elif prior_amount < 1.1 * current_amount:
            ad = 'E'

        state_sequence.append(dd+ad)

        prior = current
        last_date = current_date

    if state_sequence:
        #根據最近一個狀態傳送營銷郵件日期
        last_state = state_sequence[-1]
        row_index = states.index(last_state)
        row_value = model[row_index] #轉移矩陣中行號為row_index的這一行值
        max_value = max(row_value) #row_value中最大值
        col_index = row_value.index(max_value) #max_value的索引號
        next_state = states[col_index]

        if next_state.startswith('S'):
            next_date = last_date + datetime.timedelta(15)
        elif next_state.startswith('E'):
            next_date = last_date + datetime.timedelta(45)
        else:
            next_date = last_date + datetime.timedelta(90)

    print('使用者:{},下一次營銷郵件:{}'.format(user_id, next_date))

3.狀態轉移矩陣

4.396976638863118E-6 4.396976638863118E-6 0.8062208425486636 0.15858575643387607 4.396976638863118E-6 4.396976638863118E-6 0.035153828227710626 4.396976638863118E-6 4.396976638863118E-6
0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111
0.8043794973142799 3.1671227323401235E-6 3.1671227323401235E-6 0.1594804651869869 3.1671227323401235E-6 3.1671227323401235E-6 0.03611153339414209 3.1671227323401235E-6 3.1671227323401235E-6
1.225940909648155E-5 1.225940909648155E-5 0.8156797842343999 1.225940909648155E-5 1.225940909648155E-5 0.15299742552408974 0.031212455559642024 1.225940909648155E-5 1.225940909648155E-5
0.010869565217391304 0.010869565217391304 0.7282608695652174 0.010869565217391304 0.010869565217391304 0.11956521739130435 0.05434782608695652 0.010869565217391304 0.010869565217391304
4.122521334047904E-5 4.122521334047904E-5 0.8190625386486375 0.14758626375891495 4.122521334047904E-5 4.122521334047904E-5 0.03298017067238323 4.122521334047904E-5 4.122521334047904E-5
4.703226413319537E-5 4.703226413319537E-5 0.8348697206283511 4.703226413319537E-5 4.703226413319537E-5 0.14081459881478695 4.703226413319537E-5 4.703226413319537E-5 0.023845357915530052
0.017857142857142856 0.017857142857142856 0.6785714285714286 0.017857142857142856 0.017857142857142856 0.125 0.017857142857142856 0.017857142857142856 0.03571428571428571
5.083884087442806E-4 5.083884087442806E-4 0.7961362480935434 5.083884087442806E-4 5.083884087442806E-4 0.16573462125063548 0.03304524656837824 5.083884087442806E-4 5.083884087442806E-4

4.預測結果

使用者:user1, 預測下次傳送時間:2020-01-22 00:00:00
使用者:user2, 預測下次傳送時間:2020-02-16 00:00:00
使用者:user3, 預測下次傳送時間:2020-01-25 00:00:00
使用者:user4, 預測下次傳送時間:2020-04-05 00:00:00
使用者:user5, 預測下次傳送時間:2020-01-30 00:00:00
使用者:user6, 預測下次傳送時間:2020-01-16 00:00:00