1. 程式人生 > >Spark迭代計算之通話記錄分析

Spark迭代計算之通話記錄分析

 

 

 


 

 

 

資料探勘實驗報告

 

  Spark迭代計算之通話記錄分析  

 

 

 

 

 

 

 

 

 

二〇一八年十二月

 

 

 

通話記錄分析

一、專案背景

本專案課題是來自於大學生的專案結課作業,所以我所瞭解的相關的基於資料探勘的客戶通話資料分析專案正是為了解決從不同的角度對移動企業的收益情況進行分析,用不同方法進行呼叫特徵分析和品牌業務的預測,建立資料探勘的客戶通話資料分析系統,著重針對相關的通話記錄表、號碼資訊表等資料資源,利用資料探勘技術對客戶通話資訊進行分析,從而對高層次決策人員提供輔助決策支援。在現今激烈的電子資訊行業競爭環境下,有效利用資料分析手段,解決商務運營問題有著非常重要的意義。

二、專案介紹

(一)專案需求

1、顯示撥出電話的所有日期、電話號碼、通話時長、及通話型別

2、輸出每一日期的所有通話的次數

3、按時間遞增順序排列出每一電話號碼的通話記錄

4、查詢電話號碼18211360627的所有資訊

5、查詢所有電話的電話號碼、通話時長、型別

6、查詢2017-01-03的所有通話資訊

7、以(日期,(電話號碼,型別))巢狀鍵值對的形式顯示通話記錄

8、查詢每一天的通話記錄的總次數並依據日期的前後進行排序

9、查詢每一電話號碼的通話記錄的次數並且分組排序

10、按每一電話號碼的通話記錄時間遞增順序排列出它的通話記錄。

(二)專案實現(專案架構)

1、通過Flume工具將call_records表和number_field匯入到HDFS中儲存;

2、通過Loader工具將MySQL中的call_records表和number_field匯入Spark中分析;

2、使用MapReduce對HDFS中的資料進行清洗,清洗要求去除錯誤欄位,補充不完整欄位,並根據Spark分析需求取出所涉及欄位按照一定格式儲存到HDFS中。清洗之後要求保留如下欄位:

Call_record表中:type 通話型別;Call_duration  通話時長;Phone_number手機號;Phone_data  通話日期;

number_field維表中:id  編號;Number_bigint  欄位號;Card_type  電話卡的型別;City 城市;Area  區號;

3、將MapReduce清洗後的資料匯入Spark中,使用Spark對清洗後的資料和從MySQL中匯入的資料進行統計分析。具體統計分析內容見“專案需求”;

三、專案資料

本專案需要出來的資料包括本地MySQL資料庫中的call_record維表和number_field維表。

由老師提供專案所需要的相關資料,並且提前把它放在叢集外某個節點的指定目錄下(節點IP地址為10.51.46.108。指定目錄為/home/zhangyu/liwen);並生成專案所需相關資料以維表的方式存入MySQL資料庫中(資料庫所在節點的IP地址為10.51.46.108,可以通過SSH方式訪問,使用者名稱為zhangyu,密碼為zhangyu),生成call_record維表和number_field維表,通過這些操作得到最後想要的相關結果。

1、call_record維表

Type

通話型別

Call_duration

通話時長

Phone_number

手機號

Phone_data

通話日期

2、number_field維表

id

 編號

Number_bigint

欄位號

Card_type

電話卡的型別

City

城市

Area

       區號

 

 

四、程式碼實現

相關檔案的匯入以及前期的相關準備工作


val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val split = rdd1.filter(a => a.contains("撥出")).saveAsTextFile("/zz/cc")

 

 

 

 

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

rdd1.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect

 

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

rdd1.map(line => ( line.split('\t')(1).toString, line.split('\t')(0) ) ).sortByKey(true).collect

 

 

 

 

 

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val split = rdd1.filter(a => a.contains("18211360627")).collect

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val rdd11 =rdd1.map(line => (line.split('\t')(1),(line.split('\t')(2),line.split('\t')(3)) ) ).collect

 

 

 

 

 

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val spilt = rdd1.filter(a => a.contains("2017-01-03")).collect

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val rdd11 =rdd1.map(line => (line.split('\t')(0),(line.split('\t')(1),line.split('\t')(3)) ) ).collect

 

 

 

 

 

 

 

 

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

val rdd11=rdd1.map(line => ( line.split('\t')(1).toString,(line.split('\t')(0).toString, line.split('\t')(2) ) ) ).sortByKey(true).collect

val rdd1=sc.textFile("hdfs:///spark/call_records.txt")

val rdd2=sc.textFile("hdfs:///spark/number_field.txt")

rdd1.map(line=> (line.split('\t')(1),1)).reduceByKey(_+_).collect