流式計算--實戰(日誌監控系統)
1.日誌監控系統
資料的流向:flume+kafka+storm+mysql
資料流程如下:
-
應用程式使用log4j產生日誌
- 部署flume客戶端監控應用程式產生的日誌資訊,併發送到kafka叢集中
- storm spout拉去kafka的資料進行消費,逐條過濾每條日誌的進行規則判斷,對符合規則的日誌進行郵件告警。
- 最後將告警的資訊儲存到mysql資料庫中,用來進行管理。
資料從flume到kafka到storm在這一篇部落格
2.資料模型設計
1.使用者表:用來儲存使用者的資訊,包括賬號、手機號碼、郵箱、是否有效等資訊
2.應用表:用來儲存應用的資訊,包括應用名稱、應用描述、應用是否線上等資訊
3.應用型別表: 用來儲存應用的型別等資訊
4.規則表:用來儲存規則的資訊,包括規則名稱,規則描述,規則關鍵詞等資訊
5.規則記錄表:用來儲存觸發規則後的記錄,包括告警編號、是否簡訊告知、是否郵件告知、告警明細等資訊。
3.程式碼開發
新建一個maven工程,pom檔案如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wx</groupId>
<artifactId>logmonitoring</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--storm的以來包-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.6</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--KafkaSpout的依賴包,這個就可以把kafka的資料流到storm-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.6</version>
<!-- <exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>-->
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<!--kafka的依賴包-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<!--傳送雲簡訊的依賴-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.0.6</version> <!--注:如提示報錯,先升級基礎包版,無法解決可聯絡技術支援-->
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dysmsapi</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!--打包的時候把專案其他依賴的一些jar,一些類打成一個整體-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.wx.kafkaandstorm.KafkaAndStormTopologyMain</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
編寫一個topology:這是主線,根據這個來編寫邏輯
package com.wx.logmonitor;
import com.wx.logmonitor.bolt.FilterBolt;
import com.wx.logmonitor.bolt.PrepareRecordBolt;
import com.wx.logmonitor.bolt.SaveMessage2MySql;
import com.wx.logmonitor.spout.RandomSpout;
import com.wx.logmonitor.spout.StringScheme;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
/*
日誌監控系統驅動類
*/
public class LogMonitorTopologyMain {
private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class);
public static void main(String[] args) throws Exception{
// 使用TopologyBuilder進行構建驅動類
TopologyBuilder builder = new TopologyBuilder();
// 設定kafka的zookeeper叢集
// BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181");
//// // 初始化配置資訊
// SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor");
// 在topology中設定spout
// builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3);
// builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout");
builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId"));
builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt");
//啟動topology的配置資訊
Config topologConf = new Config();
//TOPOLOGY_DEBUG(setDebug), 當它被設定成true的話, storm會記錄下每個元件所發射的每條訊息。
//這在本地環境除錯topology很有用, 但是在線上這麼做的話會影響效能的。
topologConf.setDebug(true);
//storm的執行有兩種模式: 本地模式和分散式模式.
if (args != null && args.length > 0) {
//定義你希望叢集分配多少個工作程序給你來執行這個topology
topologConf.setNumWorkers(2);
//向叢集提交topology
StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology());
} else {
topologConf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", topologConf, builder.createTopology());
Utils.sleep(10000000);
cluster.shutdown();
}
}
}
其中RandomSpout模擬接受kafka的資料,接受的為一條一條的日誌資料,因為在網路端進行nio傳輸,所以把他包裹成ByteBuffer物件再序列化進行傳輸,指定一個line域,傳到下游bolt進行處理。spout主要程式碼:
package com.wx.logmonitor.spout;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
*隨機產生訊息傳送出去
*/
public class RandomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private TopologyContext context;
private List list ;
private final StringScheme scheme;
public RandomSpout(final StringScheme scheme) {
super();
this.scheme = scheme;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
list = new ArrayList();
list.add("1$$$$$error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");
list.add("2$$$$$java.sql.SQLException: You have an error in your SQL syntax;");
list.add("1$$$$$error Unable to connect to any of the specified MySQL hosts.");
list.add("1$$$$$error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");
}
/**
* 傳送訊息 storm 框架在 while(true) 呼叫nextTuple方法
*/
public void nextTuple() {
final Random rand = new Random();
String msg = list.get(rand.nextInt(4)).toString();
//序列化的時候在網路上nio傳輸,所以需要ByteBuffer型別的資料
ByteBuffer buffer = RandomSpout.getByteBuffer(msg);
this.collector.emit(this.scheme.deserialize(buffer));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*
將String轉化為Buffer
*/
public static ByteBuffer getByteBuffer(String str) {
return ByteBuffer.wrap(str.getBytes());
}
//訊息源可以傳送多條訊息流stream,多條訊息流可以理解為多種型別的資料
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
// line
declarer.declare(this.scheme.getOutputFields());
}
}
下游的bolt接到資料後進行解析過濾處理,過濾的規則主要有看看日誌資訊是否來源於日誌監控系統監控的應用(判斷是否是isonline的應用),其次日誌資訊需要對照規則表,看看是否觸發了警告的規則。如果這兩點滿足則指定應用id域和訊息域傳到下一個bolt處理。FilterBolt的主要程式碼:
package com.wx.logmonitor.bolt;
import com.wx.logmonitor.domain.Message;
import com.wx.logmonitor.utils.MonitorHandler;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* Describe: 過濾規則資訊
*/
//BaseRichBolt 需要手動調ack方法,BaseBasicBolt由storm框架自動調ack方法
public class FilterBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(FilterBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
public void execute(Tuple input, BasicOutputCollector collector) {
//獲取KafkaSpout傳送出來的資料
String line = input.getString(0);
//獲取kafka傳送的資料,是一個byte陣列
// byte[] value = (byte[]) input.getValue(0);
//將陣列轉化成字串
// String line = new String(value);
//對資料進行解析
// appid content
//1 error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
//把讀到的資料轉化為一個自定義訊息物件,暫時只賦值了訊息的內容和應用的名稱兩個欄位
Message message = MonitorHandler.parser(line);
if (message == null) {
return;
}
//對日誌進行規制判定,看看是否觸發規則,如果滿足條件,message的對應規則id和關鍵字屬性會被賦值
if (MonitorHandler.trigger(message)) {
//定義兩個域,輸出到下游進行處理。
collector.emit(new Values(message.getAppId(), message));
}
//定時更新規則資訊
MonitorHandler.scheduleLoad();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//定義這兩個域,然後輸出交給下游Bolt處理。
declarer.declare(new Fields("appId", "message"));
}
}
下游接受到過濾後的訊息後 就發郵件通知運維人員,並且將本次操作作為一條記錄傳送到下一個bolt,PrepareRecordBolt的主要程式碼:
package com.wx.logmonitor.bolt;
import com.wx.logmonitor.domain.Message;
import com.wx.logmonitor.domain.Record;
import com.wx.logmonitor.utils.MonitorHandler;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* Describe: 將觸發資訊儲存到mysql資料庫中
*/
//BaseRichBolt 需要手動調ack方法,BaseBasicBolt由storm框架自動調ack方法
public class PrepareRecordBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(PrepareRecordBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
Message message = (Message) input.getValueByField("message");
String appId = input.getStringByField("appId");
//將觸發規則的資訊進行通知,
MonitorHandler.notifly(appId, message);
Record record = new Record();
try {
BeanUtils.copyProperties(record, message);
//定義記錄域,輸出這個記錄交給下游處理
collector.emit(new Values(record));
} catch (Exception e) {
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("record"));
}
}
下一個Bolt接收到操作記錄以後就執行插入mysql資料庫的操作,儲存此紀錄,SaveMessage2MySql的主要程式碼:
package com.wx.logmonitor.bolt;
import com.wx.logmonitor.domain.Record;
import com.wx.logmonitor.utils.MonitorHandler;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* Describe: 請補充類描述
*/
public class SaveMessage2MySql extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(SaveMessage2MySql.class);
public void execute(Tuple input, BasicOutputCollector collector) {
Record record = (Record) input.getValueByField("record");
MonitorHandler.save(record);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
好了流程走完了,看看執行的結果:
成功收到郵件,本來還想弄個簡訊通知,結果阿里雲簡訊簽名沒有申請過,算了