1. 程式人生 > >電信客服分析實現思路

電信客服分析實現思路

電信專案:

一、idea專案構建

1、安裝jdk並配置環境變數

 

2、安裝maven,解壓離線倉庫,並設定settings

 conf目錄下的setttings.xml檔案複製到離線倉庫的m2目錄下,並修改mirror標籤以及離線倉庫路徑。

      設定idea工具的maven選項,涉及到4個地方:work offline,以及3個maven設定吧。注意留意:override選項。

   

3、新建ct主專案目錄(相當於eclipse的workset)

    一個專案對應一個資料夾,舉例:

    workspace:

        ct:(新建moduel)

           ct_producer:

           該專案的各種包

            ct_analysis:

            該專案的各種包

4、新建ct_producer 模組,用於資料生產程式碼的編寫或構建

   ** 構建該專案選擇maven,ct專案下所有的模組(module)都是maven工程。(maven要是用3.3.9的)

5、設定常用選項

    取消idea自動開啟之前專案的功能(搜尋reopen,關閉相關標籤即可)

    設定字型大小(Editor——font——size)進行設定

    設定字元編碼:搜尋:encoding,3個位置全部改為utf-8

    自動導包以及自動提示設定(搜尋auto,設定自動導包為ask,程式碼自動提示為first letter)

    尖叫提示:

    idea-setttings設定的是當前專案的配置(只針對當前專案生效)

    idea-file-others-default settings設定的是全域性預設配置(也就是說,以後新建專案都是按照這個預設配置)

     

二、資料生產

   1、新建Producer.java

            ** 初始化聯絡人集合用於隨機資料使用

            ** 隨機兩個電話號碼

            ** 隨機通話建立的時間,返回String,格式:yyyy-MM-dd HH:mm:ss

            ** 隨機通話持續時間

            ** 將產生的資料寫入到本地磁碟中(日誌檔案)

  1. 技術點

獲得list集合的隨機下標

獲得主叫電話以及姓名

int indexcaller=(int)(Math.random()*phoneList.size());

 

獲得被叫的電話以及姓名,如果主叫等於被叫就繼續迴圈,不等於就對被叫進行賦值

獲得隨機日期

//將帶格式的日期轉換成毫秒數
Date startdata=formateTime.parse(startTime);

 

獲得某時間區間範圍內的日期

long radomdata=startdata.getTime()+(long)((enddata.getTime()-startdata.getTime())*Math.random());

 

路徑是傳入進去的args[0]

 

三、資料消費(資料儲存)

flume:cloudera公司研發

            適合下游資料消費者不多的情況;

            適合資料安全性要求不高的操作;

            適合與Hadoop生態圈對接的操作。

kafka:linkedin公司研發·

            適合資料下游消費眾多的情況;

            適合資料安全性要求較高的操作(支援replication);

1、安裝執行zookeeper

2、安裝配置kafka,此時我使用的版本是2.10-0821

            ** 修改server.properties

3、啟動kafka叢集

        啟動kafka叢集,分別在3臺機器上執行:

  /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/admin/modules/kafka_2.10-0.8.2.1/config/server.properties

          建立kafka主題:calllog

  /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --topic calllog --create --replication-factor 1 --partitions 4

 ** 檢視主題列表

 $ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --list

 啟動kafka控制檯消費者,用於測試

 $ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper linux01:2181 --topic calllog --from-beginning

    

4、配置flume,用於監聽實時產生的資料檔案

  ** 建立flume的job配置檔案

  尖叫提示:由於在配置flume的過程中,涉及到了資料監聽讀取方式的操作“tail -F -c +0”,即每次讀取完整的檔案,所以修改了java程式碼中,輸出流的寫出方式為:非追加,即覆蓋檔案。

啟動flume服務,注意,該語句需要進如flume跟目錄下執行

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/flume-kafka.conf

5、生產日誌

        

 

6、使用Java KafkaAPI讀取Kafka中快取的資料

   ** 通過https://mvnrepository.com/網站找到你需要使用的依賴

   ** 匯入依賴

   ** 建立包結構

   ** 建立kafka消費者類,同時建立配置檔案以及配置檔案的工具類

** 實現了將生產的日誌實時讀取到控制檯

 

7、成功拿到資料之後,使用Java HBaseAPI將資料放入Hbase表中

由於資料支援實時查詢所以選擇將資料存放在hbase表中,而hive是低延遲

   ** 拿到一條資料,我要把這條資料放到Hbase表中

   ** 建立工具類建立名稱空間配置檔案,表,等

** 建立DAO實現資料存放功能

         ** 思路沒有捋清楚:

   1、建立名稱空間

   2、建立表(不要先不要新增協處理器)(注意,需要預分割槽)

   3、建立rowkey生成方法,建立預分割槽

3.1rowkey是按照字典儲存,因此設定rowkey時,要充分利用排序特點,將經常一起讀取的資料儲存到一塊,將最近可能會被訪問的資料放到一塊。

  3.2預先建立一些空的Regions,這樣當資料寫入HBase的時候,會按照 Region分割槽情況,在進群內做資料的負載均衡。(保證資料在region中均衡分配)預分割槽返回的是一個位元組二維陣列

  1. 建立預分割槽號生成方法

將電話號碼的後四位與年月的後六位進行異或除以分割槽數,這樣設計使資料在region分佈更均勻。

   5、在HBaseDAO中的構造方法裡,初始化名稱空間,初始化表(注意判斷表是否存在)

   6、在HBaseDAO中建立put方法,用於存放資料

   7、在kafka取得資料時,使用HbaseDAO的例項化物件,呼叫put方法,將資料存入即可。

 

Hbase的API使用流程:

  1. 首先需要建立HbaseAdmin表的操作物件,使表進入到編輯模式
  2. 初始化表的物件載入Configuration配置檔案
  3. admin.tableExists(tableName) 判斷表是否存在
  4. NamespaceDescriptor ns = NamespaceDescriptor
            .create(namespace)建立名稱空間描述

admin.createNamespace(ns);建立名稱空間

 

5.HTableDescriptor tableDescriptor建立表的描述

admin.createTable(tableDescriptor, getSplitKeys(regions));建立表

6.通過配置檔案拿到值建立表的預分割槽

7.需要新增的內容

//rowKey  主叫  被叫  通話時間  通話時間的毫秒錶示 通話時長 主叫被叫的標誌
rowKey的設計要用到經常用到的資料rowkey是按照字典儲存,因此設定rowkey時,要充分利用排序特點,將經常一起讀取的資料儲存到一塊,將最近可能會被訪問的資料放到一塊。

//rowkey的設計 分割槽號 主叫 通話時間 被叫  主叫標誌 通話時長

 

8.新增協處理器(插入一條被叫資料)

admin.createTable之前將協處理器新增進去

tableDescriptor.addCoprocessor("hbase.CalleeWriteObserver");

協處理器在put的同時將新增協處理器的內容

協處理器操作也是需要得到表明,新增資料、與當前操作表進行比較、放入資料等操作

9、寫入資料到HBase中注意事項

            ** 先確保表是成功建立的

            ** 檢查Hbase各個節點均為正常,通過瀏覽器60010檢視

            ** maven導包,不要導錯了,不要重複導包,不要導錯版本

            ** 程式碼邏輯執行順序要注意

            ** 超時時間設定:

 *** kafka根目錄下的config目錄下,修改server.properties檔案對於zookeeper超時時間的限定。

*** 專案的resoureces目錄下的kafka.properties檔案中,關於zookeeper超時的設定。

 以上兩個值,設定都稍大一些,比如50000

            

10、優化資料儲存方案:使用協處理器

            1、同一條資料,儲存兩遍。

                rowkey:實現了針對某個人,查詢該人範圍時間內的所有通話記錄

                    分割槽號 + call1 + dateTime + call2 + flag + duration

                    15837312345_20170101000000_13733991234

            2、討論使用協處理器的原因

            3、操作過程

                ** 建立協處理器類:CalleeWriteObserver extends BaseRegionObserver

                ** 覆寫postPut方法

                ** 編寫程式碼一大堆(實現將被叫資料存入)

                ** 建立表方法中:表述器中新增你成功建立的協處理器

                ** 修改resources中hbase-site.xml配置檔案

                ** 打包

                ** 將包分別放於3臺機器中的hbase根目錄中的lib目錄下群發

                ** 3臺機器中的hbase-site.xml檔案註冊協處理器群發

<property>

 <name>hbase.coprocessor.region.classes</name>

 <value>com.china.coprocessor.CalleeWriteObserver</value>

</property

                ** 重啟hbase

                ** 測試

                ** 如果測試成功,記得把表初始化一下

 

執行語句:

1.啟動yarn

 

2.啟動hadoop

 

3.啟動hbase

 

4.啟動kafka

(1)啟動kafka的叢集

bin/kafka-server-start.sh -daemon config/server.properties

(2)建立kefka分割槽

bin/kafka-topics.sh --zookeeper had01:2181 --topic calllog --create --replication-factor 1 --partitions 3

(3)檢視分割槽

bin/kafka-topics.sh --zookeeper had01:2181 --list

(4)啟動消費者

bin/kafka-console-consumer.sh --zookeeper had01:2181 --topic calllog --from-beginning

5.啟動flum

bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/app/flume-1.8.0/calllog/flum-calllog-kafka.conf

6.開啟kafka消費者

java -cp /datas/ct_consumer-1.0-SNAPSHOT.jar:/datas/mvn/* kafka.HBaseConsumer

 

7.java生產者

java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /home/hadoop/call/calllog.csv

 

注意:執行的時候需要將idea的依賴包匯出來放到執行目錄下並指定

 

11、打包,執行消費資料方法

 方式一:

  java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer

  方式二:

  windows:java -cp ./lib/*;ct_consumer.jar com.china.kafka.HBaseConsumer

                    linux:java -cp ./lib/*:ct_consumer.jar com.china.kafka.HBaseConsumer

  idea打包,並提交到linux執行

** 打包,並將打好的包以及第三方依賴整個拷貝出來

** 上傳該資料夾(ct_consumer_jar)到linux中

** 執行:java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer

6、某個使用者,傳入指定的時間範圍,查詢該時間範圍內的該使用者的所有通話記錄(包含主叫和被叫)

                15837312345 2017-01-01 2017-05-01

                rowkey:01_15837312345_20171102181630_13737312345_1_0180

                ** scan.setStartRow

                        2017-01-01

                ** scan.setStopRow

                        2017-02-01

                ** 組裝rowkey

                    01_15837312345_201711

                    01_15837312345_201712

            7、Filter測試講解

 

 四、資料分析

   類的結構以及功能:

1.mapper----reduce------runner

mapper端的輸出-------kv設計類

key的組成分為兩個類聯絡人+日期------組合(有父類)電話號碼+時間

valve:通話次數+通話時長(有父類)

2.runner類輸入hbase------輸出mysql----

 

3.MySQLOutputFormat類實現主表資料的插入

輸出的形式1_1  1  1 次數 通話時長

 

4.DimensionConverterImpl:獲取聯絡人資訊以及時間的資訊的id

為了避免從mysql中重複拿資料所以需要用到lruCache 快取類,將常用的資料放到快取中,查詢id的話首先在快取中進行查詢,快取中沒有的到mysql中進行查詢。mysql中沒有的話就將資料插入然後再次進行查詢最後返回id資訊。

  1. 連線資料庫的工具類

連線資料庫,關閉資料庫

  1.  LRUCache工具類

 

類的具體實現

1、統計所有使用者每月通話記錄(通話次數,通話時長)

2、統計所有使用者每月通話記錄(通話次數,通話時長)

3、匯入Mysql建表語句(db_telecom.sql)

4、新建專案,構建包結構,建立能夠想到的需要使用的類,不需要任何實現

(1)JDBCutils:

本類是jdbc連線的工具類負責開啟資料庫的連線關閉資料庫的連線

(2)JDBCinstance:

不讓其他物件過多的建立資料庫的連線      

  1. base資料夾主要是kv的介面類方便向上轉型以及向下轉型
  2. key中存放key的分類以及key的結合的類

kv.key.ContactDimension

private String telephone;
private String name;

 

kv.key.DateDimension

private String year;
private String month;
private String day;

 

kv.key.CommDimension

private ContactDimension contactDimension=new ContactDimension();
private  DateDimension dateDimension=new DateDimension();

 

value類封裝的是通話時長以及通話次數的屬性

 

  1. mapper類的解析

在mapper階段主要分析的是key的值繼承tablemapper類從hbase中載入資料

過濾掉被調資料,使用主調資料的rowkey進行操作

將主叫資料年份、年月、年月日+通話時間

rowkey的被叫資料年份、年月、年月日+通話時間

分別通過context物件傳遞給reduce的階段

context(date主叫年+contact,通話時長)

context(date主叫月+contact,通話時長)

context(date主叫日+contact,通話時長)

context(date被叫年+contact,通話時長)

context(date被叫月+contact,通話時長)

context(date被叫日+contact,通話時長)

  1.  reduce類的實現

每個人的通話次數以及通話時長進行彙總

  1.  runner類的實現

a.輸入的路徑為hbase表

b.輸出的路徑是mysql表

c.runner類繼承的是Tool類,可以實現配置檔案的set方法以及get方法

d.實現的是HBaseConfiguration.create(conf)

e.run方法的描述

f.建立job以及job執行的主類

g.設定inputstream路徑

獲得操作表的admin操作物件

判斷表是否存在,不存在則丟擲異常,存在則掃描這張表的資料

//初始化mapper
TableMapReduceUtil.initTableMapperJob("ns_ct:calllog", scan,
        CountDurctionMapper.class, CommDimension.class, Text.class,
        job, true);

h.設定job的reduce

i.設定outputformat

關聯到MySQLoutputformat類

return job.waitForCompletion(true) ? 0 : 1;

j.執行run方法,args為mysql-connect工具

int status = ToolRunner.run(new countDurationRunner(), args);

(8)MySQLOutputFormat類的實現

繼承OutputFormat

  1. 定義輸出物件

OutputCommitter

  1.  初始化jdbc聯結器,建立作業

RecordWriter<CommDimension, CountDurationValue> getRecordWriter(TaskAttemptContext context)

return new MysqlRecordWriter(conn);

  1. MysqlRecordWriter物件的實現

本類實現向資料庫寫資料

獲得要寫入資料庫的所有的屬性

時間id以及聯絡人的id的獲取要通過DimensionConverterImpl類來實現

  1. 書寫sql語句
  2.  將sql語句載入到preparedStatement物件中(執行資料庫操作的介面)

設定preparedStatement物件的屬性

 preparedStatement.addBatch();批量執行sql語句

  1. 關閉資料庫的操作

g.執行作業的提交getOutputCommitter()重寫此方法

(9)返回聯絡人以及時間的id的類操作

a.  產生本類的日誌檔案

private static final Logger logger = LoggerFactory.getLogger(DimensionConverterImpl.class);

 

b.  物件執行緒化,每個執行緒管理自己的jdbc

private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>();

 

c. 構建記憶體快取物件
private LRUCache lruCache = new LRUCache(3000);

 

d.獲得時間物件或者聯絡人物件的toString的形式

String cacheKey = genCacheKey(dimension);

e. 快取中的資料是以鍵值對的形式存在的

lruCache.containsKey(cacheKey)

判斷是否包含key對應的值,如果包含的話則返回對應的值id

不包含的話則獲得查詢的sql的語句集合,包含插入的以及查詢的sql的語句

f.  先執行查詢操作

將語句放入preparedStatement 物件

設定引數,setArguments(preparedStatement, dimension);

返回結果,如果結果不為空返回對應的id如果為空繼續執行

g. 返回的結果為空

執行插入操作後,再次執行對應的查詢語句,返回對應的id

 

 

(10)問題總結:

        1、ComDimension構造方法中,例項化時間維度和聯絡人維度物件。

        2、MySQLOutputformat的close方法中,沒有關閉資源,關閉:JDBCUtil.close(conn, preparedStatement, null);

        3、Runner,Mapper,Reducer中的泛型,不要使用抽象類

        4、DimensionConverter中的genSQL方法寫反了,需要調換位置。

        5、DimensionConverter中設定JVM退出時,關閉資源,如下:Runtime.getRuntime().addShutdownHook(new Thread(() ->

JDBCUtil.close(threadLocal.get(), null, null)));

        6、Mysql的url連線一定要是具體的主機名或者IP地址

        7、DimensionConverter中的close方法關閉資料庫連線

        8、除錯時,打包jar,上傳到linux,拔掉網線,進行測試。

        9、資料庫連線到底何時關閉,要梳理清楚。解決 Too many connections;

                        MySQLOutputformat -- MysqlRecordWriter -- DimensionConverter

        10、mysql-driver包沒有匯入成功

 

執行語句

啟動hadoop

啟動zookeeper

啟動日誌檢測

mr-jobhistory-daemon.sh  start historyserver

啟動hbase

/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis-1.0-SNAPSHOT.jar runner.countDurationRunner -libjars ./mysql-connector-java-5.1.27-bin.jar

此執行的方法是將mysql-connector-java-5.1.27-bin.jar打包到程式中後可以這樣執行

/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis.jar runner.countDurationRunner

 

四、資料展示

1、展示資料所需要的欄位都有哪些:

call_sum, call_duration_sum, telephone, name, year, month, day

2、通話通話次數與通話時長,展示使用者關係。

3、通過表格,展示一個人當年所有的話單資訊。

4.ssm框架,用到了spring相當於配置檔案、和springMVC框架相當於controller層,dao層相當於mybatis層

 

類的實現:

(1)bean類的實現

   QueryInfo將前臺傳來的值封裝成的類

bean.CallLog,將後臺拿出來的值封裝成的類

  1. 前臺拿到資料,通過post的方式進行提交name+對應的值

由web.xml檔案-----applicationContext.xml檔案-----此檔案中包含對應的需要掃描的包------同時此檔案中還包括對應的返回路徑的時候對應的路徑

 

  1.  此時前端的資料到了controller層

前臺的結果通過引數封裝在 QueryInfo queryInfo物件中

將前端的資料封裝為HashMap集合用於給sql語句進行傳值

獲得dao層的物件

判斷前端返回過來的結果是否符合資料要求,不符合則提示輸入錯誤

呼叫callLogDAO類(dao層)進行資料的結果查詢

返回list<callLog>集合

 

獲得月份/日期的集合

 

獲得次數集合

 

獲得通話時長集合

 

將資料通過model物件載入到另一個頁面

  1.  CallLogDAO類的實現

getcallloglist(HashMap<String,String> paramsMap)

可以給命名引數設定值

private NamedParameterJdbcTemplate namedParameterJdbcTemplate ;

 

引數傳過來一個HashMap的集合

判斷傳過來的日期是符合使用者全年查詢還是全月的查詢

並組裝sql語句

BeanPropertyRowMapper<CallLog> beanPropertyRowMapper = new BeanPropertyRowMapper<>(CallLog.class);
List<CallLog> list = namedParameterJdbcTemplate.query(sql, paramsMap, beanPropertyRowMapper);

通過這兩句返回結果集

前端通過${requestScope.name}引數進行拿值進行處理

    

五.Kafka問題修復:

    初始化zookeeper

 

IDEA常用快捷鍵:

    alt + enter:智慧修復

    ctrl + alt + v:自動生成當前物件名

    ctrl + alt + t:自動撥出包裹選單

    ctrl + o:撥出覆寫選單

    ctrl + alt + l:格式化程式碼

    ctrl + shift + enter:自動補全當前行程式碼缺失的符號

 

IDEA方式打包工程:

    File--project stru-- arti--點選加號--copy to the output....--ok--ok--build--rebuild