大資料之電話日誌分析callLog案例(四)
阿新 • • 發佈:2018-11-09
一、修改kafka資料在主題中的貯存時間,預設是7天 ------------------------------------------------- [kafka/conf/server.properties] log.retention.hours=1 二、使用hive進行聚合查詢 ---------------------------------------------------- 1.hive命令列查詢 //查詢caller = xxx 的所有通話記錄,並按照calltime分組 select count(*) , calltime from ext_calllogs_in_hbase where caller = '15338597777' group by calltime ; //查詢caller = xxx 的所有通話記錄,並按照月份進行分組 select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777' group by substr(calltime,1,6) ; //查詢caller = xxx ,年份是2017年的所有通話記錄,並按照月份進行分組 select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777' and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ; //查詢所有使用者的各個月份的通話次數 select caller,substr(calltime,1,6) , count(*) from ext_calllogs_in_hbase group by caller , substr(calltime,1,6) ; 2.程式設計實現:指定號碼指定年份中各月份的通話次數 a. 在HiveService類中新增新的方法
/** * 查詢指定人員指定年份中各個月份的通話次數 */ public List<CalllogStat> statCalllogsCount(String caller, String year){ List<CalllogStat> list = new ArrayList<CalllogStat>() ; try { Connection conn = DriverManager.getConnection(url); Statement st = conn.createStatement(); //拼串: select count(*) , substr(calltime,1,6) from ext_calllogs_in_hbase where caller = '15338597777' // and substr(calltime,1,4) == '2018' group by substr(calltime,1,6) ; String sql = "select count(*) ,substr(calltime,1,6) from ext_calllogs_in_hbase " + "where caller = '" + caller+"' and substr(calltime,1,4) == '" + year + "' group by substr(calltime,1,6)"; ResultSet rs = st.executeQuery(sql); Calllog log = null; while (rs.next()) { CalllogStat logSt = new CalllogStat(); logSt.setCount(rs.getInt(1)); logSt.setYearMonth(rs.getString(2)); list.add(logSt); } rs.close(); return list; } catch (Exception e) { e.printStackTrace(); } return null; }
b.在Controller中新增新的控制器
/** * 統計指定人員,指定月份的通話次數 */ @RequestMapping("/calllog/toStatCalllog") public String toStatCalllog(){ return "calllog/statCalllog" ; } /** * 統計指定人員,指定月份的通話次數 */ @RequestMapping("/calllog/statCalllog") public String statCalllog(Model m ,@RequestParam("caller") String caller ,@RequestParam("year") String year){ List<CalllogStat> list = hcs.statCalllogsCount(caller, year); m.addAttribute("stat" , list) ; return "calllog/statCalllog" ; }
c.編寫前端介面顯示
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
<title>通話記錄統計結果</title>
<link rel="stylesheet" type="text/css" href="../css/my.css">
<script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
<script type="text/javascript">
//定義函式
function refreshTable(){
$("#t1 tbody").empty();
$.getJSON("/calllog/json/findAll", function (data) {
$.each(data, function (i, obj) {
var str = "<tr><td>" + obj.caller + "</td>";
str = str + "<td> " + obj.callerName + "</td>";
str = str + "<td> " + obj.callee + "</td>";
str = str + "<td> " + obj.calleeName + "</td>";
str = str + "<td></td>";
str = str + "<td> " + obj.callTime + "</td>";
str = str + "<td> " + obj.callDuration + "</td>";
str = str + "</tr>";
$("#t1 tbody").append(str);
});
});
}
$(function(){
setInterval(refreshTable, 2000);
})
</script>
</head>
<body>
<form action='<c:url value="/calllog/statCalllog" />' method="post">
電話號碼 : <input type="text" name="caller"><br>
年 份: <input type="text" name="year"><br>
<input type="submit" name="查詢">
</form>
<br>
<table id="t1" border="1px" class="t-1" style="width: 800px">
<thead>
<tr>
<td>月份</td>
<td>次數</td>
</tr>
</thead>
<tbody>
<c:forEach items="${stat}" var="s">
<tr>
<td><c:out value="${s.yearMonth}"/></td>
<td><c:out value="${s.count}"/></td>
</tr>
</c:forEach>
</tbody>
</table>
</body>
</html>
d.執行ssm app,輸入網址進行測試 三、Linux使用awk命令批量按照java程序名稱kill程序 ------------------------------------------------------ 1.awk就是把檔案逐行的讀入,以空格為預設分隔符將每行切片,切開的部分再進行各種分析處理 $> jps | awk '{print $1}'; //列印第一列 $> jps | awk -F'.' {print $1}'; //指定分隔符'.'(預設為空格) 2.shell程式設計關閉kafka $> kill -9 `jps | grep Kafka | awk '{print $1}'`; 3.動態提取ip $> ifconfig | grep inet | head -1 | awk '{print $2}'; 四、快捷bash指令碼改造 ------------------------------------------------------- 1.[xkill.sh] #!/bin/bash pids=`jps | grep $1 | awk '{print $1}'` for pid in $pids ; do kill -9 $pid done 2.[xcall.sh] #!/bin/bash [email protected] i=201 for (( i=201 ; i <= 206 ; i = $i + 1 )) ; do tput setaf 2 echo ============= s$i ============= tput setaf 7 ssh -4 s$i "source /etc/profile ; $params" done 3.開啟kafka叢集 [/usr/local/bin/xkafka-cluster-start.sh] #!/bin/bash servsers="s200 s300 s400" for s in $servers ; do ssh $s "source /etc/profile ; kafka-server-start.sh -daemon /soft/kakfa/config/server.properties" done 4.啟動zk叢集 [/usr/local/bin/xzk-cluster-start.sh] #!/bin/bash servers="s100 s200 s300" for s in $servers ; do ssh $s "source /etc/profile ; zkServer.sh start" done 5.xconsumer-start.sh [/usr/local/bin/xconsumer-start.sh] #!/bin/bash cd /home/centos/KafkaHbaseConsumer run.sh & 6.s201:xflume-calllog-start.sh [/usr/local/bin/xconsumer-start.sh] #!/bin/bash cd /soft/flume/conf flume-ng agent -f calllog.conf -n a1 & 五、修改CalllogController,解決json亂碼問題 ------------------------------------------------------------
/**
* 模擬最底層的request和response,實現直接返回json串到前端頁面
*/
@RequestMapping("calllog/json/findAll")
public String findAllJson(HttpServletResponse response)
{
try {
List<Calllog> list = cs.findAll();
String jsonStr = JSONArray.toJSONString(list);
//設定迴應的資料型別是json串
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
//得到傳送給客戶端的輸出流
ServletOutputStream sos = response.getOutputStream();
sos.write(jsonStr.getBytes("utf-8"));
sos.flush();
sos.close();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
六、使用echarts實現資料視覺化 -- 柱狀圖 ----------------------------------------------------- 1.下載echarts指令碼,並放圖ssm專案的web/js目錄下 http://echarts.baidu.com/dist/echarts.js 2.echarts入門演示bar.html -- 柱狀圖
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>bar.html</title>
<script src="../js/jquery-3.2.0.min.js"></script>
<script src="../js/echarts.js"></script>
<script>
$(function () {
var myChart = echarts.init(document.getElementById('main'));
var option = {
title: {
text: 'xxxx2018年度各月份通話次數'
},
tooltip: {},
legend: {
data: ['通話次數']
},
xAxis: {
data: ["1月份", "2月份", "3月份"]
},
yAxis: {},
series: [{
name: '通話次數',
type: 'bar',
data: [100, 300, 280]
}]
};
myChart.setOption(option);
})
</script>
</head>
<body>
<div id="main" style="border:1px solid blue;width:600px;height:400px;">
</div>
</body>
</html>
3.修改統計的jsp介面statCalllog.jsp
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
<title>通話記錄統計結果</title>
<link rel="stylesheet" type="text/css" href="../css/my.css">
<script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
<script src="../js/echarts.js"></script>
<script>
$(function () {
var myChart = echarts.init(document.getElementById('main'));
var option = {
title: {
text: '<c:out value="${title}" />'
},
tooltip: {},
legend: {
data: ['通話次數']
},
xAxis: {
data: [<c:forEach items="${list}" var="e">'<c:out value="${e.yearMonth}"/>',</c:forEach>]
},
yAxis: {},
series: [{
name: '通話次數',
type: 'bar',
data: [<c:forEach items="${list}" var="e"><c:out value="${e.count}"/>,</c:forEach>]
}]
};
myChart.setOption(option);
})
</script>
</head>
<body>
<form action='<c:url value="/calllog/statCalllog" />' method="post">
電話號碼 : <input type="text" name="caller"><br>
年 份: <input type="text" name="year"><br>
<input type="submit" name="查詢">
</form>
<div id="main" style="border:1px solid blue;width:600px;height:400px;">
</div>
</body>
</html>
4.修改CallLogController.java
/**
* 統計指定人員,指定月份的通話次數
*/
@RequestMapping("/calllog/statCalllog")
public String statCalllog(Model m ,@RequestParam("caller") String caller ,@RequestParam("year") String year){
List<CalllogStat> list = hcs.statCalllogsCount(caller, year);
m.addAttribute("title", caller + "在" + year + "年各月份通話次數統計");
m.addAttribute("list", list);
return "calllog/statCalllog" ;
}
七、ganglia ----------------------------- 1.ganglia 介紹 叢集監控. 不僅能夠監控單個主機的資源情況,還可以對叢集整個資源進行統計。 gmond //在每個節點收集資源資料的。 gmetad //接受每個節點發送資源資料 gweb //webui,展示資料web程式,和gmetad通訊。 2.Centos安裝ganglia a.ganglia-gmond 所有節點。 $>sudo yum install -y ganglia-gmond $>sudo apt-get install -y ganglia-gmond b.ganglia-gmetad s201 $>sudo yum install -y ganglia-gmetad c.ganglia-gweb [s201] 1)安裝依賴 $>sudo yum install -y httpd php 2)下載ganglia-web-3.5.12.tar.gz程式 wget http://ncu.dl.sourceforge.net/project/ganglia/ganglia-web/3.5.12/ganglia-web-3.5.12.tar.gz 3)tar開檔案 4)修改Makefile檔案 ... 5)啟動服務 [s201] $>sudo service httpd start $>sudo service gmetad start $>sudo service gmond start [s202] $>sudo service gmond start 3.使用 yum方式安裝如果出現沒有可用源 a.換源(aliyan-->) 對於大資料生態圈的專案,cloudera的倉庫比較全,而且沒有bug. cloudera-cdh-5.repo --> /etc/yum.repo.d/下 b.清除快取 $>sudo yum cleanall c.重建快取 $>sudo yum make cache d.繼續通過yum安裝 4.Ubuntu安裝ganglia --------------------------------- 1.在Master節點s100上安裝monitor和webfrontend[slave節點不需要安裝web] $s100> sudo apt-get update $s100> sudo apt-get install ganglia-monitor rrdtool gmetad ganglia-webfrontend 2.配置ganglia mycluster a.複製 Ganglia webfrontend Apache 配置: $s100> sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf b.編輯 Ganglia 元守護程式的配置檔案: $s100> sudo nano /etc/ganglia/gmetad.conf 找到 "data_source "my cluster" localhost" 更改為 data_source "mycluster" 192.168.43.131:8649 //列出機器服務的資料來源[ha 叢集name],叢集數量,IP:埠,如果未指定埠號8649(預設gmond埠)。 c.編輯主節點的配置檔案 $s100> sudo nano /etc/ganglia/gmond.conf [------] cluster { name = "mycluster" //unspecified修改為mycluster owner = "unspecified" latlong = "unspecified" url = "unspecified" } [-----] /* Feel free to specify as many udp_send_channels as you like. Gmond used to only support having a single channel */ udp_send_channel { # mcast_join = 239.2.11.71 host = 192.168.43.131 //新增一行,master ip port = 8649 ttl = 1 } [-----] /* You can specify as many udp_recv_channels as you like as well. */ udp_recv_channel { # mcast_join = 239.2.11.71 //註釋掉 port = 8649 #bind = 239.2.11.71 //註釋掉 } d.儲存退出 3.配置slave節點配置[s200 s300 s400 s500] 將master修改好的/etc/ganglia/gmond.conf 複製到slave個節點,替換原有檔案。 $s100> scp gmond.conf [email protected]:/etc/ganglia 4.啟動hadoop、hbase叢集 start-dfs.sh start-yarn.sh start-hbase.sh 5.啟動ganglia [方式一] $> sudo service ganglia-monitor start(每臺機器都需要啟動) $> sudo service gmetad start(在安裝了ganglia-webfrontend的機器上啟動 $> sudo /etc/init.d/apache2 restart(在主機上重啟apache2) [方式二] 或者:使用apt-get方式安裝的Ganglia,可以直接用service方式啟動 $s100> sudo /etc/init.d/ganglia-monitor start $s100> sudo /etc/init.d/gmetad start $s100> sudo /etc/init.d/apache2 restart 6.驗證是否安裝成功 http://localhost/ganglia/ 八、使用udp協議實現程序監控 ---------------------------------------------------------- 1.在資料生成模組中新建類udp.HeartBeatThread ---------------------------------------------------
package udp;
import calllog.gen.main.PropertiesUtil;
import java.io.IOException;
import java.net.*;
/**
* 工具類
* 傳送心跳資訊,證明程式還活著
* 監控使用
*/
public class HeartBeatThread extends Thread{
private DatagramSocket sock;
public HeartBeatThread()
{
try {
sock = new DatagramSocket(PropertiesUtil.getInt("heartbeat.udp.send.port"));
//守護執行緒
this.setDaemon(true);
} catch (SocketException e) {
e.printStackTrace();
}
}
@Override
public void run() {
byte[] bs = new byte[1];
bs[0] = (byte)PropertiesUtil.getInt("heartbeat.udp.send.flag");
DatagramPacket packet = new DatagramPacket(bs,1);
String bcAddr = PropertiesUtil.getString("heartbeat.udp.send.bcAddr");
int bcPort = PropertiesUtil.getInt("heartbeat.udp.send.bcPort");
packet.setSocketAddress(new InetSocketAddress(bcAddr,bcPort));
while(true)
{
try {
sock.send(packet);
Thread.sleep(PropertiesUtil.getInt("heartbeat.udp.send.sleep.ms"));
System.out.println("資料生成模組,傳送1次心跳" + bs[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2.修改資料生成App主程式的main函式,啟動app的時候,同時開啟監控執行緒
public static void main(String [] args)
{
genCallLog();
//開啟監控執行緒
new HeartBeatThread().start();
}
3.在SSM模組中新增監控類com.ssm.monitor.MonitorService ,用於監控其他程序傳送的心跳資訊
package com.ssm.monitor;
import com.it18zhang.ssm.domain.HeartBeat;
import com.it18zhang.ssm.util.PropertiesUtil;
import org.springframework.stereotype.Service;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
/**
* 監控其他程式心跳資訊的類
*/
@Service("monitorService")
public class MonitorService extends Thread{
private ReceiveThread t;
public MonitorService()
{
//開啟監控執行緒
t = new ReceiveThread();
t.start();
}
public List<HeartBeat> getHeartBeats()
{
return new ArrayList<HeartBeat>(t.map.values());
}
}
4.在SSM模組中新增監控分執行緒類com.ssm.monitor.ReceviceThread
package com.ssm.monitor;
import com.it18zhang.ssm.domain.HeartBeat;
import com.it18zhang.ssm.util.PropertiesUtil;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReceiveThread extends Thread{
private DatagramSocket sock;
//ip + 最後一次收到心跳的時間戳
public Map<String, HeartBeat> map = new HashMap<String, HeartBeat>();
public ReceiveThread() {
try{
//設定接收埠
sock = new DatagramSocket(PropertiesUtil.getInt("heartbeat.udp.receive.port"));
//守護執行緒
this.setDaemon(true);
System.out.println("開始接收心跳 ...");
}
catch(Exception e)
{
e.printStackTrace();
}
}
@Override
public void run() {
byte[] bs = new byte[1];
DatagramPacket packet = new DatagramPacket(bs,1);
while(true)
{
try {
sock.receive(packet);
int flag = bs[0];
InetSocketAddress addr = (InetSocketAddress) packet.getSocketAddress();
String sendIp = addr.getAddress().getHostAddress();
map.put(sendIp,new HeartBeat(sendIp, flag, System.currentTimeMillis()));
System.out.println("接收心跳" + flag);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
5.在ssm的domain包中新增新的javabean類,封裝心跳資訊HeartBeat類
package com.it18zhang.ssm.domain;
public class HeartBeat {
//udp傳送方的ip
private String ip;
//傳送的訊息內容
private int flag;
//最近一次接收到訊息時的事件
private long ts;
public HeartBeat() {
}
public HeartBeat(String ip, int flag, long ts) {
this.ip = ip;
this.flag = flag;
this.ts = ts;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
}
6.在消費者模組中拷貝心跳傳送程式碼HeartBeatThread和工具類和修改kafka配置檔案,新增心跳屬性 [kafka.propeties] zookeeper.connect=s100:2181,s200:2181,s300:2181 group.id=calllog zookeeper.session.timeout.ms=500 zookeeper.sync.time.ms=250 auto.commit.interval.ms=1000 #從頭消費 auto.offset.reset=smallest #主題 topic=calllog #表名 table.name=call:calllogs #分割槽數 partition.number=100 #主叫標記 caller.flag=0 #hash區域的模式 hashcode.pattern=00 heartbeat.udp.send.port=6666 heartbeat.udp.send.flag=3 heartbeat.udp.send.bcAddr=192.168.43.255 heartbeat.udp.send.bcPort=9999 heartbeat.udp.send.sleep.ms=1000 7.修改HbaseCustomer類,在main函式中啟動心跳傳送執行緒
package calllog.kafka.hbase.customer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Properties;
/**
* hbase消費者,從kafka獲取日誌資訊,儲存到hbase中
*/
public class HbaseCustomer {
public static void main(String [] args)
{
//開啟心跳傳送
new HeartBeatThread().start();
//hbasedao
HbaseDao dao = new HbaseDao();
//建立消費者配置檔案
ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);
//建立消費者
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props));
//繫結主題
String topic = PropertiesUtil.getPorp("topic");
Map<String, Integer> map = new HashMap<String, Integer>();
map.put(topic, new Integer(1));
//開始消費
Map<String, List<KafkaStream<byte[], byte[]>>> kafkaMsg = consumer.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> msgList = kafkaMsg.get(topic);
String kafka_hbaseMsg = "";
for(KafkaStream<byte[],byte[]> msg : msgList)
{
ConsumerIterator<byte[],byte[]> mm = msg.iterator();
while (mm.hasNext()) {
MessageAndMetadata<byte[], byte[]> next = mm.next();
byte [] m = next.message();
//獲取訊息
kafka_hbaseMsg = new String(m);
//寫入hbase
dao.put(kafka_hbaseMsg);
}
}
}
}
8.在ssm模組中編寫網頁MonitorController --------------------------------------
package com.it18zhang.ssm.web.controller;
import com.alibaba.fastjson.JSON;
import com.it18zhang.ssm.domain.HeartBeat;
import com.ssm.monitor.MonitorService;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.Resource;
import java.util.List;
@Controller
public class MonitorController {
@Resource(name="monitorService")
private MonitorService ms;
@RequestMapping("/monitor/toMonitorPage")
public String toMonitorPage()
{
return "monitor/monitorPage";
}
@RequestMapping("/json/monitor/getMonitorInfo")
public String getMonitorInfo(@RequestParam("heartbeat")HeartBeat ht)
{
List<HeartBeat> list = ms.getHeartBeats();
return JSON.toJSONString(list);
}
}
9.編寫jsp介面monitor/monitorPage.jsp
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
<title>通話記錄</title>
<link rel="stylesheet" type="text/css" href="../css/my.css">
<script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
<script type="text/javascript" >
//定義函式
function getHearbeatInfo(){
$("#div1").empty();
$.getJSON("/json/monitor/getMonitorInfo", function (data) {
$("#div1").append(data);
});
}
$(function(){
setInterval(getHearbeatInfo, 1000);
})
</script>
</head>
<body>
<div id="div1" style="border:1px solid blue;width: 400px ; height: 300px">
</div>
</body>
</html>
10.將生成資料程式和消費者程式打包,扔到ubuntu上執行,開啟ssm網頁進行測試