電信客服分析實現思路
電信專案:
一、idea專案構建
1、安裝jdk並配置環境變數
2、安裝maven,解壓離線倉庫,並設定settings
conf目錄下的setttings.xml檔案複製到離線倉庫的m2目錄下,並修改mirror標籤以及離線倉庫路徑。
設定idea工具的maven選項,涉及到4個地方:work offline,以及3個maven設定吧。注意留意:override選項。
3、新建ct主專案目錄(相當於eclipse的workset)
一個專案對應一個資料夾,舉例:
workspace:
ct:(新建moduel)
ct_producer:
該專案的各種包
ct_analysis:
該專案的各種包
4、新建ct_producer 模組,用於資料生產程式碼的編寫或構建
** 構建該專案選擇maven,ct專案下所有的模組(module)都是maven工程。(maven要是用3.3.9的)
5、設定常用選項
取消idea自動開啟之前專案的功能(搜尋reopen,關閉相關標籤即可)
設定字型大小(Editor——font——size)進行設定
設定字元編碼:搜尋:encoding,3個位置全部改為utf-8
自動導包以及自動提示設定(搜尋auto,設定自動導包為ask,程式碼自動提示為first letter)
尖叫提示:
idea-setttings設定的是當前專案的配置(只針對當前專案生效)
idea-file-others-default settings設定的是全域性預設配置(也就是說,以後新建專案都是按照這個預設配置)
二、資料生產
1、新建Producer.java
** 初始化聯絡人集合用於隨機資料使用
** 隨機兩個電話號碼
** 隨機通話建立的時間,返回String,格式:yyyy-MM-dd HH:mm:ss
** 隨機通話持續時間
** 將產生的資料寫入到本地磁碟中(日誌檔案)
- 技術點
獲得list集合的隨機下標
獲得主叫電話以及姓名
int indexcaller=(int)(Math.random()*phoneList.size());
獲得被叫的電話以及姓名,如果主叫等於被叫就繼續迴圈,不等於就對被叫進行賦值
獲得隨機日期
//將帶格式的日期轉換成毫秒數
Date startdata=formateTime.parse(startTime);
獲得某時間區間範圍內的日期
long radomdata=startdata.getTime()+(long)((enddata.getTime()-startdata.getTime())*Math.random());
路徑是傳入進去的args[0]
三、資料消費(資料儲存)
flume:cloudera公司研發
適合下游資料消費者不多的情況;
適合資料安全性要求不高的操作;
適合與Hadoop生態圈對接的操作。
kafka:linkedin公司研發·
適合資料下游消費眾多的情況;
適合資料安全性要求較高的操作(支援replication);
1、安裝執行zookeeper
2、安裝配置kafka,此時我使用的版本是2.10-0821
** 修改server.properties
3、啟動kafka叢集
啟動kafka叢集,分別在3臺機器上執行:
/home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/admin/modules/kafka_2.10-0.8.2.1/config/server.properties
建立kafka主題:calllog
/home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --topic calllog --create --replication-factor 1 --partitions 4
** 檢視主題列表
$ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper linux01:2181 --list
啟動kafka控制檯消費者,用於測試
$ /home/admin/modules/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper linux01:2181 --topic calllog --from-beginning
4、配置flume,用於監聽實時產生的資料檔案
** 建立flume的job配置檔案
尖叫提示:由於在配置flume的過程中,涉及到了資料監聽讀取方式的操作“tail -F -c +0”,即每次讀取完整的檔案,所以修改了java程式碼中,輸出流的寫出方式為:非追加,即覆蓋檔案。
啟動flume服務,注意,該語句需要進如flume跟目錄下執行
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/flume-kafka.conf
5、生產日誌
6、使用Java KafkaAPI讀取Kafka中快取的資料
** 通過https://mvnrepository.com/網站找到你需要使用的依賴
** 匯入依賴
** 建立包結構
** 建立kafka消費者類,同時建立配置檔案以及配置檔案的工具類
** 實現了將生產的日誌實時讀取到控制檯
7、成功拿到資料之後,使用Java HBaseAPI將資料放入Hbase表中
由於資料支援實時查詢所以選擇將資料存放在hbase表中,而hive是低延遲
** 拿到一條資料,我要把這條資料放到Hbase表中
** 建立工具類建立名稱空間配置檔案,表,等
** 建立DAO實現資料存放功能
** 思路沒有捋清楚:
1、建立名稱空間
2、建立表(不要先不要新增協處理器)(注意,需要預分割槽)
3、建立rowkey生成方法,建立預分割槽
3.1rowkey是按照字典儲存,因此設定rowkey時,要充分利用排序特點,將經常一起讀取的資料儲存到一塊,將最近可能會被訪問的資料放到一塊。
3.2預先建立一些空的Regions,這樣當資料寫入HBase的時候,會按照 Region分割槽情況,在進群內做資料的負載均衡。(保證資料在region中均衡分配)預分割槽返回的是一個位元組二維陣列
- 建立預分割槽號生成方法
將電話號碼的後四位與年月的後六位進行異或除以分割槽數,這樣設計使資料在region分佈更均勻。
5、在HBaseDAO中的構造方法裡,初始化名稱空間,初始化表(注意判斷表是否存在)
6、在HBaseDAO中建立put方法,用於存放資料
7、在kafka取得資料時,使用HbaseDAO的例項化物件,呼叫put方法,將資料存入即可。
Hbase的API使用流程:
- 首先需要建立HbaseAdmin表的操作物件,使表進入到編輯模式
- 初始化表的物件載入Configuration配置檔案
- admin.tableExists(tableName) 判斷表是否存在
- NamespaceDescriptor ns = NamespaceDescriptor
.create(namespace)建立名稱空間描述
admin.createNamespace(ns);建立名稱空間
5.HTableDescriptor tableDescriptor建立表的描述
admin.createTable(tableDescriptor, getSplitKeys(regions));建立表
6.通過配置檔案拿到值建立表的預分割槽
7.需要新增的內容
//rowKey 主叫 被叫 通話時間 通話時間的毫秒錶示 通話時長 主叫被叫的標誌
rowKey的設計要用到經常用到的資料rowkey是按照字典儲存,因此設定rowkey時,要充分利用排序特點,將經常一起讀取的資料儲存到一塊,將最近可能會被訪問的資料放到一塊。
//rowkey的設計 分割槽號 主叫 通話時間 被叫 主叫標誌 通話時長
8.新增協處理器(插入一條被叫資料)
在admin.createTable之前將協處理器新增進去
tableDescriptor.addCoprocessor("hbase.CalleeWriteObserver");
協處理器在put的同時將新增協處理器的內容
協處理器操作也是需要得到表明,新增資料、與當前操作表進行比較、放入資料等操作
9、寫入資料到HBase中注意事項
** 先確保表是成功建立的
** 檢查Hbase各個節點均為正常,通過瀏覽器60010檢視
** maven導包,不要導錯了,不要重複導包,不要導錯版本
** 程式碼邏輯執行順序要注意
** 超時時間設定:
*** kafka根目錄下的config目錄下,修改server.properties檔案對於zookeeper超時時間的限定。
*** 專案的resoureces目錄下的kafka.properties檔案中,關於zookeeper超時的設定。
以上兩個值,設定都稍大一些,比如50000
10、優化資料儲存方案:使用協處理器
1、同一條資料,儲存兩遍。
rowkey:實現了針對某個人,查詢該人範圍時間內的所有通話記錄
分割槽號 + call1 + dateTime + call2 + flag + duration
15837312345_20170101000000_13733991234
2、討論使用協處理器的原因
3、操作過程
** 建立協處理器類:CalleeWriteObserver extends BaseRegionObserver
** 覆寫postPut方法
** 編寫程式碼一大堆(實現將被叫資料存入)
** 建立表方法中:表述器中新增你成功建立的協處理器
** 修改resources中hbase-site.xml配置檔案
** 打包
** 將包分別放於3臺機器中的hbase根目錄中的lib目錄下(群發)
** 3臺機器中的hbase-site.xml檔案註冊協處理器(群發)
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.china.coprocessor.CalleeWriteObserver</value>
</property
** 重啟hbase
** 測試
** 如果測試成功,記得把表初始化一下
執行語句:
1.啟動yarn
2.啟動hadoop
3.啟動hbase
4.啟動kafka
(1)啟動kafka的叢集
bin/kafka-server-start.sh -daemon config/server.properties
(2)建立kefka分割槽
bin/kafka-topics.sh --zookeeper had01:2181 --topic calllog --create --replication-factor 1 --partitions 3
(3)檢視分割槽
bin/kafka-topics.sh --zookeeper had01:2181 --list
(4)啟動消費者
bin/kafka-console-consumer.sh --zookeeper had01:2181 --topic calllog --from-beginning
5.啟動flum
bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/app/flume-1.8.0/calllog/flum-calllog-kafka.conf
6.開啟kafka消費者
java -cp /datas/ct_consumer-1.0-SNAPSHOT.jar:/datas/mvn/* kafka.HBaseConsumer
7.java生產者
java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /home/hadoop/call/calllog.csv
注意:執行的時候需要將idea的依賴包匯出來放到執行目錄下並指定
11、打包,執行消費資料方法
方式一:
java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer
方式二:
windows:java -cp ./lib/*;ct_consumer.jar com.china.kafka.HBaseConsumer
linux:java -cp ./lib/*:ct_consumer.jar com.china.kafka.HBaseConsumer
idea打包,並提交到linux執行
** 打包,並將打好的包以及第三方依賴整個拷貝出來
** 上傳該資料夾(ct_consumer_jar)到linux中
** 執行:java -Djava.ext.dirs=./lib/ -cp ct_consumer.jar com.china.kafka.HBaseConsumer
6、某個使用者,傳入指定的時間範圍,查詢該時間範圍內的該使用者的所有通話記錄(包含主叫和被叫)
15837312345 2017-01-01 2017-05-01
rowkey:01_15837312345_20171102181630_13737312345_1_0180
** scan.setStartRow
2017-01-01
** scan.setStopRow
2017-02-01
** 組裝rowkey
01_15837312345_201711
01_15837312345_201712
7、Filter測試講解
四、資料分析
類的結構以及功能:
1.mapper----reduce------runner
mapper端的輸出-------kv設計類
key的組成分為兩個類聯絡人+日期------組合(有父類)電話號碼+時間
valve:通話次數+通話時長(有父類)
2.runner類輸入hbase------輸出mysql----
3.MySQLOutputFormat類實現主表資料的插入
輸出的形式1_1 1 1 次數 通話時長
4.DimensionConverterImpl:獲取聯絡人資訊以及時間的資訊的id
為了避免從mysql中重複拿資料所以需要用到lruCache 快取類,將常用的資料放到快取中,查詢id的話首先在快取中進行查詢,快取中沒有的到mysql中進行查詢。mysql中沒有的話就將資料插入然後再次進行查詢最後返回id資訊。
- 連線資料庫的工具類
連線資料庫,關閉資料庫
- LRUCache工具類
類的具體實現
1、統計所有使用者每月通話記錄(通話次數,通話時長)
2、統計所有使用者每月通話記錄(通話次數,通話時長)
3、匯入Mysql建表語句(db_telecom.sql)
4、新建專案,構建包結構,建立能夠想到的需要使用的類,不需要任何實現
(1)JDBCutils:
本類是jdbc連線的工具類負責開啟資料庫的連線關閉資料庫的連線
(2)JDBCinstance:
不讓其他物件過多的建立資料庫的連線
- base資料夾主要是kv的介面類方便向上轉型以及向下轉型
- key中存放key的分類以及key的結合的類
kv.key.ContactDimension
private String telephone;
private String name;
kv.key.DateDimension
private String year;
private String month;
private String day;
kv.key.CommDimension
private ContactDimension contactDimension=new ContactDimension();
private DateDimension dateDimension=new DateDimension();
value類封裝的是通話時長以及通話次數的屬性
- mapper類的解析
在mapper階段主要分析的是key的值繼承tablemapper類從hbase中載入資料
過濾掉被調資料,使用主調資料的rowkey進行操作
將主叫資料年份、年月、年月日+通話時間
rowkey的被叫資料年份、年月、年月日+通話時間
分別通過context物件傳遞給reduce的階段
context(date主叫年+contact,通話時長)
context(date主叫月+contact,通話時長)
context(date主叫日+contact,通話時長)
context(date被叫年+contact,通話時長)
context(date被叫月+contact,通話時長)
context(date被叫日+contact,通話時長)
- reduce類的實現
每個人的通話次數以及通話時長進行彙總
- runner類的實現
a.輸入的路徑為hbase表
b.輸出的路徑是mysql表
c.runner類繼承的是Tool類,可以實現配置檔案的set方法以及get方法
d.實現的是HBaseConfiguration.create(conf)
e.run方法的描述
f.建立job以及job執行的主類
g.設定inputstream路徑
獲得操作表的admin操作物件
判斷表是否存在,不存在則丟擲異常,存在則掃描這張表的資料
//初始化mapper
TableMapReduceUtil.initTableMapperJob("ns_ct:calllog", scan,
CountDurctionMapper.class, CommDimension.class, Text.class,
job, true);
h.設定job的reduce
i.設定outputformat
關聯到MySQLoutputformat類
return job.waitForCompletion(true) ? 0 : 1;
j.執行run方法,args為mysql-connect工具
int status = ToolRunner.run(new countDurationRunner(), args);
(8)MySQLOutputFormat類的實現
繼承OutputFormat
- 定義輸出物件
OutputCommitter
- 初始化jdbc聯結器,建立作業
RecordWriter<CommDimension, CountDurationValue> getRecordWriter(TaskAttemptContext context)
return new MysqlRecordWriter(conn);
- MysqlRecordWriter物件的實現
本類實現向資料庫寫資料
獲得要寫入資料庫的所有的屬性
時間id以及聯絡人的id的獲取要通過DimensionConverterImpl類來實現
- 書寫sql語句
- 將sql語句載入到preparedStatement物件中(執行資料庫操作的介面)
設定preparedStatement物件的屬性
preparedStatement.addBatch();批量執行sql語句
- 關閉資料庫的操作
g.執行作業的提交getOutputCommitter()重寫此方法
(9)返回聯絡人以及時間的id的類操作
a. 產生本類的日誌檔案
private static final Logger logger = LoggerFactory.getLogger(DimensionConverterImpl.class);
b. 物件執行緒化,每個執行緒管理自己的jdbc
private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>();
c. 構建記憶體快取物件
private LRUCache lruCache = new LRUCache(3000);
d.獲得時間物件或者聯絡人物件的toString的形式
String cacheKey = genCacheKey(dimension);
e. 快取中的資料是以鍵值對的形式存在的
lruCache.containsKey(cacheKey)
判斷是否包含key對應的值,如果包含的話則返回對應的值id
不包含的話則獲得查詢的sql的語句集合,包含插入的以及查詢的sql的語句
f. 先執行查詢操作
將語句放入preparedStatement 物件
設定引數,setArguments(preparedStatement, dimension);
返回結果,如果結果不為空返回對應的id如果為空繼續執行
g. 返回的結果為空
執行插入操作後,再次執行對應的查詢語句,返回對應的id
(10)問題總結:
1、ComDimension構造方法中,例項化時間維度和聯絡人維度物件。
2、MySQLOutputformat的close方法中,沒有關閉資源,關閉:JDBCUtil.close(conn, preparedStatement, null);
3、Runner,Mapper,Reducer中的泛型,不要使用抽象類
4、DimensionConverter中的genSQL方法寫反了,需要調換位置。
5、DimensionConverter中設定JVM退出時,關閉資源,如下:Runtime.getRuntime().addShutdownHook(new Thread(() ->
JDBCUtil.close(threadLocal.get(), null, null)));
6、Mysql的url連線一定要是具體的主機名或者IP地址
7、DimensionConverter中的close方法關閉資料庫連線
8、除錯時,打包jar,上傳到linux,拔掉網線,進行測試。
9、資料庫連線到底何時關閉,要梳理清楚。解決 Too many connections;
MySQLOutputformat -- MysqlRecordWriter -- DimensionConverter
10、mysql-driver包沒有匯入成功
執行語句
啟動hadoop
啟動zookeeper
啟動日誌檢測
mr-jobhistory-daemon.sh start historyserver
啟動hbase
/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis-1.0-SNAPSHOT.jar runner.countDurationRunner -libjars ./mysql-connector-java-5.1.27-bin.jar
此執行的方法是將mysql-connector-java-5.1.27-bin.jar打包到程式中後可以這樣執行
/opt/app/hadoop-2.7.2/bin/yarn jar /datas/ct_analysis.jar runner.countDurationRunner
四、資料展示
1、展示資料所需要的欄位都有哪些:
call_sum, call_duration_sum, telephone, name, year, month, day
2、通話通話次數與通話時長,展示使用者關係。
3、通過表格,展示一個人當年所有的話單資訊。
4.ssm框架,用到了spring相當於配置檔案、和springMVC框架相當於controller層,dao層相當於mybatis層
類的實現:
(1)bean類的實現
QueryInfo,將前臺傳來的值封裝成的類
bean.CallLog,將後臺拿出來的值封裝成的類
- 前臺拿到資料,通過post的方式進行提交name+對應的值
由web.xml檔案-----applicationContext.xml檔案-----此檔案中包含對應的需要掃描的包------同時此檔案中還包括對應的返回路徑的時候對應的路徑
- 此時前端的資料到了controller層
前臺的結果通過引數封裝在 QueryInfo queryInfo物件中
將前端的資料封裝為HashMap集合用於給sql語句進行傳值
獲得dao層的物件
判斷前端返回過來的結果是否符合資料要求,不符合則提示輸入錯誤
呼叫callLogDAO類(dao層)進行資料的結果查詢
返回list<callLog>集合
獲得月份/日期的集合
獲得次數集合
獲得通話時長集合
將資料通過model物件載入到另一個頁面
- CallLogDAO類的實現
getcallloglist(HashMap<String,String> paramsMap)
可以給命名引數設定值
private NamedParameterJdbcTemplate namedParameterJdbcTemplate ;
引數傳過來一個HashMap的集合
判斷傳過來的日期是符合使用者全年查詢還是全月的查詢
並組裝sql語句
BeanPropertyRowMapper<CallLog> beanPropertyRowMapper = new BeanPropertyRowMapper<>(CallLog.class);
List<CallLog> list = namedParameterJdbcTemplate.query(sql, paramsMap, beanPropertyRowMapper);
通過這兩句返回結果集
前端通過${requestScope.name}引數進行拿值進行處理
五.Kafka問題修復:
初始化zookeeper
IDEA常用快捷鍵:
alt + enter:智慧修復
ctrl + alt + v:自動生成當前物件名
ctrl + alt + t:自動撥出包裹選單
ctrl + o:撥出覆寫選單
ctrl + alt + l:格式化程式碼
ctrl + shift + enter:自動補全當前行程式碼缺失的符號
IDEA方式打包工程:
File--project stru-- arti--點選加號--copy to the output....--ok--ok--build--rebuild