Strom實時熱力圖展示專案
一、概述
本篇文章主要介紹如何使用Storm + Logstash + Kafka 實現實時資料的計算,並且使用高德地圖API實現熱力圖的展示。
背景知識:
在有些場合,我們需要了解當前人口的流動情況,比如,需要實時監控一些旅遊景點旅客的密集程度,這時可以使用GPS定位系統將該區域內旅客的IP資料進行計算,但是GPS定位系統也有一定的缺點,不是每個旅客都會GPS功能,這時可以使用“信令”來獲取個人定位資訊。所謂“信令”就是每個手機會不是的向附近最近的基站傳送定位資訊,除非手機關機。相信每個人在做車旅遊的時候每經過一個地方都會受到某個地區的簡訊,“某某城市歡迎你的來訪”等資訊,移動電信應用就是利用“信令”來監控每個的定位資訊。(同時也可以看出大資料下個人隱私很難受到保護)。
1. 專案架構
在這裡我們使用了 Logstash來抽取日誌資料,它與 Flume 類似,由於沒有是實驗專案,因此使用 Python 模擬資料。在經過 Logstash 將資料抽取到 Kafka 中,Strom 會實時消費資料,然後計算結果實時寫入 MySQL資料庫中,然後我們可以將結果送到後臺應用中使用和視覺化展示。
2. 環境以及軟體說明
- storm-1.1.1
- zookeeper-3.4.5-cdh5.3.6
- logstash-2.4.1
- kafka_2.11-0.9.0.0
二、實戰
1. 模擬資料
#coding=UTF-8 import random import time phone=[ "13869555210", "18542360152", "15422556663", "18852487210", "13993584664", "18754366522", "15222436542", "13369568452", "13893556666", "15366698558" ] location=[ "116.191031, 39.988585", "116.389275, 39.925818", "116.287444, 39.810742", "116.481707, 39.940089", "116.410588, 39.880172", "116.394816, 39.91181", "116.416002, 39.952917" ] def sample_phone(): return random.sample(phone,1)[0] def sample_location(): return random.sample(location, 1)[0] def generator_log(count=10): time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) f=open("/opt/storm_project/datas/logs/access.log","a+") while count>=1: query_log="{phone}\t{location}\t{date}".format(phone=sample_phone(),location=sample_location(),date=time_str) f.write(query_log+"\n") # print query_log count=count-1 if __name__=='__main__': generator_log(100)
2. Logstash 配置
在Logstash安裝目錄下新增配置檔案 storm_pro.conf:
input{ file{ path => '/opt/storm_project/datas/logs/access.log' } } output{ kafka{ topic_id => "storm_project" batch_size => 1 bootstrap_servers =>"hadoop-senior.shinelon.com:9092" codec => plain{ format => "%{message}" } } }
注意:上面配置中path指定讀取資料的檔案,可自行建立。topic_id 引數為下文kafka中需要建立的 topic主題。
3. Kafka配置
在kafka安裝目錄下新增配置檔案server.properties:
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=hadoop-senior.shinelon.com
zookeeper.connect=hadoop-senior.shinelon.com:2181
注意:kafka需要配置zookeeper使用,需要配置zk。
4. Strom程式編寫
package cn.just.shinelon.integration;
import cn.just.shinelon.utils.DateUtil;
import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.kafka.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.UUID;
public class KafkaTopology {
/**
* 原始碼:
* public class RawMultiScheme implements MultiScheme {
public RawMultiScheme() {
}
public Iterable<List<Object>> deserialize(ByteBuffer ser) {
return Arrays.asList(Utils.tuple(new Object[]{Utils.toByteArray(ser)}));
}
public Fields getOutputFields() {
return new Fields(new String[]{"bytes"});
}
}
*/
public static class PrintBolt extends BaseRichBolt{
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector=outputCollector;
}
@Override
public void execute(Tuple tuple) {
try {
byte[] bytes=tuple.getBinaryByField("bytes");
String input = new String(bytes);
String[] logs = input.split("\t");
String phone = logs[0];
String tmp = logs[1];
//經度
Double longitude = Double.parseDouble(tmp.split(",")[0]);
//緯度
Double latitude = Double.parseDouble(tmp.split(",")[1]);
//時間,需要計算當前N久的資料
long timestamp = DateUtil.getInstance().getTime(logs[2]);
System.out.println(phone+", "+longitude+","+latitude+", "+timestamp);
//發射資料
outputCollector.emit(new Values(timestamp,latitude,longitude));
outputCollector.ack(tuple);
} catch (Exception e) {
e.printStackTrace();
outputCollector.fail(tuple);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("time","latitude","longitude"));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//JDBC配置引數
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/storm");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","123456");
ConnectionProvider connectionProvider;
connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
//表名
String tableName = "location";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into location values(?,?,?)")
.withQueryTimeoutSecs(30);
//ZK地址
BrokerHosts hosts = new ZkHosts("hadoop-senior.shinelon.com:2181");
String topicName="storm_project";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
//設定消費資料時間,預設會從源頭開始消費
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout",kafkaSpout);
builder.setBolt("PrintBolt",new PrintBolt()).shuffleGrouping("KafkaSpout");
builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping("PrintBolt");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("KafkaTopology",new Config(),builder.createTopology());
}
}
5. 資料庫的設計
create database storm;
use storm;
create table location(
time bigint,
latitude double,
longitude double
)charset utf8;
6. 叢集的啟動
首先啟動kafka(注意:需要啟動ZK)。
啟動kafka:
nohup bin/kafka-server-start.sh config/server.properties &
建立topic:
bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 --
topic storm_project
注意:topic名稱和logstash中配置的必須一致。
啟動logstash:
bin/logstash -f storm_pro.conf
在啟動kafka和logstash之後就可以啟動 Strom,接著可以執行python資料模擬器,就會看到資料庫中存入了計算結果:
三、資料視覺化展示
視覺化結果如下圖所示:
前端頁面如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>高德地圖</title>
<link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
</head>
<body>
<script src="js/echarts.min.js"></script>
<script src="js/jquery.min.js"></script>
<script src="http://webapi.amap.com/maps?v=1.4.9&key=d16808eab90b7545923a1c2f4bb659ef"></script>
<div id="container"></div>
<script>
var map = new AMap.Map("container", {
resizeEnable: true,
center: [116.418261, 39.921984],
zoom: 11
});
var heatmap;
var points =(function a(){ //<![CDATA[
var city=[];
$.ajax({
type:"POST",
url:"../get_map",
dataType:'json',
async:false, //
success:function(result){
for(var i=0;i<result.length;i++){
//alert("呼叫了");
city.push({"lng":result[i].longitude,"lat":result[i].latitude,"count":result[i].count});
}
}
})
return city;
})();//]]>
/**
[
{"lng":116.191031,"lat":39.988585,"count":1000},
{"lng":116.389275,"lat":39.925818,"count":110},
{"lng":116.287444,"lat":39.810742,"count":1200},
{"lng":116.481707,"lat":39.940089,"count":130},
{"lng":116.410588,"lat":39.880172,"count":140},
{"lng":116.394816,"lat":39.91181,"count":15552},
{"lng":116.416002,"lat":39.952917,"count":16}
];
**/
;
map.plugin(["AMap.Heatmap"],function() { //載入熱力圖外掛
heatmap = new AMap.Heatmap(map,{
raduis:50,
opacity:[0,0.7]
}); //在地圖物件疊加熱力圖
heatmap.setDataSet({data:points,max:100}); //設定熱力圖資料集
//具體引數見介面文件
});
// var map = new AMap.Map('container', {
// pitch:75, // 地圖俯仰角度,有效範圍 0 度- 83 度
// viewMode:'3D' // 地圖模式
//});
</script>
</body>
</html>
SpringBoot DAO層程式碼如下:
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Component;
import com.shiyanlou.shinelon.movie.domain.CityTimes;
import com.shiyanlou.shinelon.movie.utils.MysqlUtil;
import cu.just.spark.domain.CourseClickCount;
import cu.just.spark.domain.Location;
import cu.just.spark.utils.HBaseUtils;
import groovy.util.logging.Commons;
/**
* @author shinelon
*
*/
@Component
public class LocationDao {
private static MysqlUtil mysqlUtil;
public List<Location> map() throws Exception{
List<Location> list = new ArrayList<Location>();
Connection connection=null;
PreparedStatement psmt=null;
try {
connection = MysqlUtil.getConnection();
psmt = connection.prepareStatement("select longitude,latitude,count(*) from location where "
+ "time>unix_timestamp(date_sub(current_timestamp(),interval 10 minute))*1000 "
+ "group by longitude,latitude");
ResultSet resultSet = psmt.executeQuery();
while (resultSet.next()) {
Location location = new Location();
location.setLongitude(resultSet.getDouble(1));
location.setLatitude(resultSet.getDouble(2));
location.setCount(resultSet.getInt(3));
list.add(location);
}
}catch (Exception e){
e.printStackTrace();
}finally {
MysqlUtil.release();
}
return list;
}
}
實體類:
public class Location {
private Integer count;
private double latitude;
private double longitude;
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
}
工具類:
package cu.just.spark.utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MysqlUtil {
private static final String DRIVER_NAME="jdbc:mysql://localhost:3306/movie?user=root&password=123456";
private static Connection connection;
private static PreparedStatement pstm;
private static ResultSet resultSet;
public static Connection getConnection(){
try {
Class.forName("com.mysql.jdbc.Driver");
connection=DriverManager.getConnection(DRIVER_NAME);
}catch (Exception e){
e.printStackTrace();
}
return connection;
}
public static void release(){
try {
if(resultSet!=null) {
resultSet.close();
}
if (pstm != null) {
pstm.close();
}
if(connection!=null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(connection!=null){
connection=null; //help GC
}
}
}
}
Controller層:
package cu.just.spark.controller;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
import cu.just.spark.dao.LocationDao;
import cu.just.spark.domain.Location;
@RestController
public class MapController {
@Autowired
public LocationDao locationDao;
@RequestMapping("/storm")
public ModelAndView storm() {
return new ModelAndView("map");
}
@RequestMapping("/get_map")
@ResponseBody
public List<Location> getMap() throws Exception{
return locationDao.map();
}
}
專案原始碼地址:原始碼