spark之Markov馬爾可夫智慧郵件預測
阿新 • • 發佈:2020-11-22
一.來源
此專案來源《Data Algorithms Recipes for Scaling Up with Hadoop and Spark》第11章,本程式利用spark3.0以及java8進行改寫,
改寫的有:
1.利用spark3.0與java8
2.直接利用spark生成最終的狀態轉移矩陣,不用分開統計
3.利用python載入狀態轉移矩陣進行預測
二.目的
使用者的購買行為看起來是沒有規律可循的,但其實從時間有序的角度看,也許是有規律可循的,例如,使用者可能每一個月發工資時購買得多,每年某個時間(雙十一、生日)等購買得比較多
馬爾科夫模型能夠挖掘出時間上的規律,假設我們能夠根據使用者上一次購買記錄推測其下一次購買時間,就可以在推測時間向其傳送郵件進行營銷
至於營銷的商品內容,可以根據其他推薦演算法的結果。
三.程式
完整程式見,Markov馬爾可夫智慧郵件預測(https://github.com/jiangnanboy/spark_tutorial)
1.利用spark統計狀態轉移矩陣
/** * 建立狀態轉移概率矩陣 * @param session */ public static void buildStateTransitionMatrix(SparkSession session, Broadcast<Map<String, String>> broadcastStatesMap, Broadcast<List<Tuple2<String, List<Double>>>> broadcastInitStateList) {//customerID,transactionID,purchaseDate,amount(顧客ID,交易ID,交易日期,金額) String path = PropertiesReader.get("intermediate_smart_email_txt"); JavaRDD<String> javaRDD = session.read().textFile(path).toJavaRDD().coalesce(10); //key=customerID,v=(purchaseDate,amount) JavaPairRDD<String, Tuple2<Long, Integer>> javaPairRDD = javaRDD.mapToPair(line -> { String[] tokens= StringUtils.split(line, ","); if(4 != tokens.length) { return null; } long date = DateUtils.parseDate(tokens[2], "yyyy-MM-dd").getTime(); int amount = Integer.parseInt(tokens[3]); Tuple2<Long, Integer> t2 = new Tuple2<>(date, amount); return new Tuple2<>(tokens[0], t2); }); //group by customerID JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = javaPairRDD.groupByKey(); //建立狀態序列 JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(dateAndAmount -> { List<Tuple2<Long, Integer>> list = toList(dateAndAmount); Collections.sort(list, TupleComparatorAscending.INSTANCE);//對list按日期排序 return toStateSequence(list); }); /** * customerID, List<State> * 所有狀態的頻率為1 =》((fromState, toState),1) * | S1 S2 S3 ... *---+----------------------- *S1 | <probability-value> * | *S2 | * | *S3 | * | *...| */ JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> { List<String> states = s._2; List<Tuple2<Tuple2<String, String>, Integer>> mapOut = new ArrayList<>(); if((null == states) || (states.size() < 2)) { return Collections.emptyIterator(); } for(int i = 0; i < (states.size() - 1); i++) { String fromState = states.get(i); String toState = states.get(i+1); Tuple2<String, String> t2 = new Tuple2<>(fromState, toState); mapOut.add(new Tuple2<>(t2, 1)); } return mapOut.iterator(); }); // 統計所有狀態頻率: ((fromState, toState), frequence) JavaPairRDD<Tuple2<String, String>, Integer> fromStateToStateFrequence1 = model.reduceByKey((i1, i2) -> i1 + i2); // ((fromState, toState), frequence) =》 (fromState, (toState, frequence)) JavaPairRDD<String, Tuple2<String, Integer>> fromStateToStateFrequence2 = fromStateToStateFrequence1.mapToPair(s -> { String key = s._1._1; Tuple2<String, Integer> value = new Tuple2<>(s._1._2, s._2); return new Tuple2<>(key, value); }); // group by fromState =》 fromState,List<Tuple2<toState, frequence>> => rowNumber,List<Tuple2<toState, frequence>> JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupState = fromStateToStateFrequence2.groupByKey().mapToPair(st2 -> { String rowNumber = broadcastStatesMap.getValue().get(st2._1); return new Tuple2<>(rowNumber, st2._2); }); //初始化矩陣狀態,value = 1.0 / size //List<Tuple2<String, List<Double>>> initStateList = initState(broadcastStatesMap.getValue().size()); JavaPairRDD<String, List<Double>> initStatePairRDD = JavaSparkContext.fromSparkContext(session.sparkContext()).parallelizePairs(broadcastInitStateList.getValue()); //initStatePairRDD.leftOuterJoin(groupState) JavaPairRDD<String, Tuple2<List<Double>, Optional<Iterable<Tuple2<String, Integer>>>>> joinPairRDD = initStatePairRDD.leftOuterJoin(groupState); //規範化轉移矩陣,使行的概率和為“1” JavaPairRDD<String, List<Double>> resultJavaPairRDD = joinPairRDD.mapValues(lot2 -> { int size = broadcastStatesMap.getValue().size(); List<Double> listDouble = lot2._1; Optional<Iterable<Tuple2<String, Integer>>> option = lot2._2; if(option.isPresent()) { Iterable<Tuple2<String, Integer>> toStateFrequence = option.get(); Iterator<Tuple2<String, Integer>> iter = toStateFrequence.iterator(); List<Tuple2<String, Integer>> iterList = new ArrayList<>(); int sum = 0; while(iter.hasNext()) { Tuple2<String, Integer> t2 = iter.next(); iterList.add(t2); sum += t2._2; } //加入平滑,防止概率為0 if(iterList.size() < size) { sum += size; for(int i = 0; i < listDouble.size(); i ++) { listDouble.set(i, 1.0/sum); } } for(int i = 0; i < iterList.size(); i++) { String stateNumber = broadcastStatesMap.getValue().get(iterList.get(i)._1); double numalizeValue = iterList.get(i)._2 / (double)sum; listDouble.set(Integer.parseInt(stateNumber), numalizeValue); } } else { return listDouble; } return listDouble; }); //1.利用sortByKey對轉移狀態排序,最終的狀態轉移概率矩陣 //List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.sortByKey().collect(); //2.利用takeOrdered對轉移狀態排序,最終的狀態轉移概率矩陣 List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.takeOrdered(broadcastStatesMap.getValue().size(), StateTupleComparatorAscending.INSTANCE); //列印轉移概率矩陣 for(Tuple2<String, List<Double>> s : stateResult) { StringBuilder sb = new StringBuilder(); sb.append(s._1).append(","); for(int i = 0; i < (s._2.size() - 1); i ++) { sb.append(s._2.get(i)).append(" "); } sb.append(s._2.get(s._2.size() - 1)); System.out.println(sb.toString()); } }
2.利用python載入狀態轉移矩陣並進行預測
import os import time import datetime # 根據(spark)Markov.java統計出的馬爾可夫模型(model.txt),對validate.txt中的資料進行預測什麼時間應該發出營銷郵件 user_action = {} model = [] #9大狀態 states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"] validate_path = os.path.join(os.getcwd(), "validate.txt") model_path = os.path.join(os.getcwd(), "model.txt") #讀取validate data with open(validate_path, 'r', encoding='utf-8') as f_read: for line in f_read: items = line.strip().split(',') user_id = items[0] if user_id in user_action.keys(): hist = user_action[user_id] lst = [items[2], items[-1]] hist.append(lst) else: hist = [] hist.append([items[2], items[-1]]) user_action[user_id] = hist print(user_action) #讀取model data with open(model_path, 'r', encoding='utf-8') as f_read: for line in f_read: items = line.strip().split() row = [] for item in items: row.append(float(item)) model.append(row) print(model) #根據最近客戶的行為資料(至少兩次交易)make prediciton for user_id,user_action_list in user_action.items(): if len(user_action_list) < 2: continue state_sequence = [] last_date = '' prior = user_action_list[0] for i in range(1, len(user_action_list)): current = user_action_list[i] prior_date = prior[0] current_date = current[0] #相隔天數 prior_date = time.strptime(prior_date, '%Y-%m-%d') current_date = time.strptime(current_date, '%Y-%m-%d') prior_date = datetime.datetime(prior_date[0], prior_date[1], prior_date[2]) current_date = datetime.datetime(current_date[0], current_date[1], current_date[2]) days_diff = (current_date - prior_date).days dd = 'L' if days_diff < 30: dd = 'S' elif days_diff < 60: dd = 'M' #相差金額 prior_amount = int(prior[1]) current_amount = int(current[1]) ad = 'G' if prior_amount < 0.9 * current_amount: ad = 'L' elif prior_amount < 1.1 * current_amount: ad = 'E' state_sequence.append(dd+ad) prior = current last_date = current_date if state_sequence: #根據最近一個狀態傳送營銷郵件日期 last_state = state_sequence[-1] row_index = states.index(last_state) row_value = model[row_index] #轉移矩陣中行號為row_index的這一行值 max_value = max(row_value) #row_value中最大值 col_index = row_value.index(max_value) #max_value的索引號 next_state = states[col_index] if next_state.startswith('S'): next_date = last_date + datetime.timedelta(15) elif next_state.startswith('E'): next_date = last_date + datetime.timedelta(45) else: next_date = last_date + datetime.timedelta(90) print('使用者:{},下一次營銷郵件:{}'.format(user_id, next_date))
3.狀態轉移矩陣
4.396976638863118E-6 4.396976638863118E-6 0.8062208425486636 0.15858575643387607 4.396976638863118E-6 4.396976638863118E-6 0.035153828227710626 4.396976638863118E-6 4.396976638863118E-6 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.8043794973142799 3.1671227323401235E-6 3.1671227323401235E-6 0.1594804651869869 3.1671227323401235E-6 3.1671227323401235E-6 0.03611153339414209 3.1671227323401235E-6 3.1671227323401235E-6 1.225940909648155E-5 1.225940909648155E-5 0.8156797842343999 1.225940909648155E-5 1.225940909648155E-5 0.15299742552408974 0.031212455559642024 1.225940909648155E-5 1.225940909648155E-5 0.010869565217391304 0.010869565217391304 0.7282608695652174 0.010869565217391304 0.010869565217391304 0.11956521739130435 0.05434782608695652 0.010869565217391304 0.010869565217391304 4.122521334047904E-5 4.122521334047904E-5 0.8190625386486375 0.14758626375891495 4.122521334047904E-5 4.122521334047904E-5 0.03298017067238323 4.122521334047904E-5 4.122521334047904E-5 4.703226413319537E-5 4.703226413319537E-5 0.8348697206283511 4.703226413319537E-5 4.703226413319537E-5 0.14081459881478695 4.703226413319537E-5 4.703226413319537E-5 0.023845357915530052 0.017857142857142856 0.017857142857142856 0.6785714285714286 0.017857142857142856 0.017857142857142856 0.125 0.017857142857142856 0.017857142857142856 0.03571428571428571 5.083884087442806E-4 5.083884087442806E-4 0.7961362480935434 5.083884087442806E-4 5.083884087442806E-4 0.16573462125063548 0.03304524656837824 5.083884087442806E-4 5.083884087442806E-4
4.預測結果
使用者:user1, 預測下次傳送時間:2020-01-22 00:00:00 使用者:user2, 預測下次傳送時間:2020-02-16 00:00:00 使用者:user3, 預測下次傳送時間:2020-01-25 00:00:00 使用者:user4, 預測下次傳送時間:2020-04-05 00:00:00 使用者:user5, 預測下次傳送時間:2020-01-30 00:00:00 使用者:user6, 預測下次傳送時間:2020-01-16 00:00:00