日誌監控告警系統的設計與實現
基於的日誌進行監控,監控需要一定規則,對觸發監控規則的日誌資訊進行告警,告警的方式,是簡訊和郵件。
log4j---->error,info,debug 應用程式程式的日誌 error級別 TimeOutException 角標越界IndexXXXException ......Error
com.alibaba.jstorm.daemon.worker.WorkerData]-[INFO] Current worker taskList:[1, 2, 3, 4, 5, 6, 7]
String.contains.(" taskList ")-------------->當訂單量觸發一千萬時,告警通知,讓大家慶祝下。
OrdertotalNum:1000萬
kafaka生成叢集的原理、分割槽
kafka消費者的負載均衡,kfakaSpout
Kafka broker(核心機制,topic,分片,檔案儲存機制)
Redis API學習
spout:從外部資料來源中讀取資料,然後轉換為topology
架構圖:
DataSource:外部資料來源
Spout:接收外部資料來源的元件,將外部資料來源轉化成storm內部的資料,以Tuple為基本的傳輸單元下發給Bolt.
Bolt:接受Spout傳送的資料,或上游的bolt的傳送的資料,根據業務邏輯進行處理,傳送給下一個Bolt或者是儲存到某種介質上,例如Redis。
Tuple:Storm內部中資料傳輸的基本單元,裡面封裝了一個List物件,用來儲存資料。
StreamGroup:資料分鐘策略,7種,shuffleGrouping,Non Grouping,FieldGrouping,Local or ShuffleGrouping.
Nimbus:任務分配
Supervisor:接受任務,並啟動worker,worker的數量是根據埠號來的。
Worker:執行任務的具體元件(JVM),可以執行兩種型別的任務,Spout任務或者bolt任務
Task:一個task屬於一個Spout或者Bolt併發任務。
zk:儲存任務分配的資訊,心跳資訊,元資料資訊。
1、背景知識
一款優秀的軟體需要具備的特點
l 軟體的實用性。
所謂有的放矢,軟體的誕生是為了解決特定的問題,比如現在流行的MVC 框架,早期的沒有MVC 開發的時候,耦合度很大,後期維護更新成本高,難度大,這樣MVC 框架就孕育而生;比如陌陌這種社交軟體,是為了解決陌生人之間交流的問題;比如疼醒這種軟體是為了解決人們遠端溝通的問題;比如OA系統為了解決公司協同流程、專案管理、知識管理等問題……所以一款優秀的軟體必須能夠解決一個領域內的問題。
l 軟體的穩定性。
軟體的實用性問題解決之後,急需要解決的問題就是軟體的穩定性。一般線上系統都會承載企業的某項業務,系統的穩定性直接影響了業務是否能夠正常運營。很多創業公司在前期只注重業務的發展,不太在意系統的穩定性,一旦使用者兩比較大的之後,就會出現很多效能的問題。這種情況就好比,你找了一個妹子,並準備深入交往後結婚,卻發現這個妹子總是有很多異性朋友在聯絡……
l 程式碼的規範性
鐵打的營盤流水的兵,一款優秀的軟體不僅僅是功能的實現。整體架構、功能模組、程式碼註釋、擴充套件性等問題也也需要考慮,畢竟在一個軟體的生命週期過程中,參與的人實在是太多了,主創人員也可能隨時流式。所以程式碼的規範性就難能可貴了。
l 升級保持向前相容性。
如果一個軟體平常使用挺好的,但是升級卻越來越費勁,或者升級後穩定性大打折扣,也難以稱得上一個好的軟體。
l 基本的使用手冊
文件、文件、文件、一個簡單有效的使用手冊,才是程式的王道,知其然才能知其所以然。能讓使用者一目瞭然,功能、架構、設計思路、程式碼等等。
2、需求分析
隨著公司業務發展,支撐公司業務的各種系統越來越多,為了保證公司的業務正常發展,急需要對這些線上系統的執行進行監控,做到問題的及時發現和處理,最大程度減少對業務的影響。
目前系統分類有:
1) 有基於Tomcat的web應用
2) 有獨立的Java Application應用
3) 有執行在linux上的指令碼程式
4) 有大規模的叢集框架(zookeeper、Hadoop、Storm、SRP……)
5) 有作業系統的執行日誌
主要功能需求分為:
監控系統日誌中的內容,按照一定規則進行過濾
發現問題之後通過簡訊和郵件進行告警
3、功能分析
l 資料輸入
使用flume客戶端獲取個系統的資料;
使用者通過頁面輸入系統名稱、負責人觸發規則等資訊
l 資料儲存
使用flume採集資料並存放在kafka叢集中
l 資料計算
使用storm編寫程式對日誌進行過濾,將滿足過濾規則的資訊,通過郵件簡訊告警並儲存到資料庫中
l 資料展示
管理頁面可以檢視觸發規則的資訊,系統負責人,聯絡方式,觸發資訊明細等
4、原型設計
產品經理設計產品原形
5、架構設計
5.1、整體架構設計
主要架構為應用+flume+kafka+storm+mysql+Java web。資料流程如下:
1. 應用程式使用log4j產生日誌
2. 部署flume客戶端監控應用程式產生的日誌資訊,併發送到kafka叢集中
3. storm spout拉去kafka的資料進行消費,逐條過濾每條日誌的進行規則判斷,對符合規則的日誌進行郵件告警。
4. 最後將告警的資訊儲存到mysql資料庫中,用來進行管理。
5.2、Flume設計
l Flume說明
Flume是一個分散式、可靠地、可用的服務,用來收集、聚合、傳輸日誌資料。
它是一個基於流式資料的架構,簡單而靈活。具有健壯性、容錯機制、故障轉移、恢復機制。
它提供一個簡單的可擴充套件的資料模型,容許線上分析程式。F
Flume 作為 cloudera 開發的實時日誌收集系統,受到了業界的認可與廣泛應用。
l Flume 設計摘要
使用 Flume EXEC執行一個linux命令來生成資料來源。例如,可以用tail命令監控一個檔案,那麼,只要檔案增加內容,EXEC就可以將增加的內容作為資料來源傳送出去。
使用 org.apache.flume.plugins.KafkaSink,將Flume EXEC產生的資料來源傳送到Kafka中。
5.3、Kafka設計
l Kafka說明
kafka是一個分散式訊息佇列:生產者、消費者的功能。
l Kakfa設計摘要
部署kafka叢集,在叢集中新增一個Topic:monitor_realtime_javaxy
5.4、Storm設計
l KafkaSpout讀取資料,需要配置Topic:monitor_realtime_javaxy
l FilterBolt判斷規則
l NotifyBolt用來發送郵件或簡訊息
l Save2DB用來將告警資訊寫入mysql資料庫
5.5、 資料模型設計
5.5.1、使用者表
用來儲存使用者的資訊,包括賬號、手機號碼、郵箱、是否有效等資訊
5.5.2、應用表
用來儲存應用的資訊,包括應用名稱、應用描述、應用是否線上等資訊
5.5.3、應用型別表
用來儲存應用的型別等資訊
5.5.4、規則表
用來儲存規則的資訊,包括規則名稱,規則描述,規則關鍵詞等資訊
5.5.5、規則記錄表
用來儲存觸發規則後的記錄,包括告警編號、是否簡訊告知、是否郵件告知、告警明細等資訊。
6、 程式碼開發
6.1、 整體結構
6.2、 LogMonitorTopologyMain驅動類
public class LogMonitorTopologyMain { private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class); public static void main(String[] args) throws Exception{ // 使用TopologyBuilder進行構建驅動類 TopologyBuilder builder = new TopologyBuilder(); // 設定kafka的zookeeper叢集 // BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181"); //// // 初始化配置資訊 // SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor"); // 在topology中設定spout // builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3); builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2); builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout"); builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId")); builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt"); //啟動topology的配置資訊 Config topologConf = new Config(); //TOPOLOGY_DEBUG(setDebug), 當它被設定成true的話, storm會記錄下每個元件所發射的每條訊息。 //這在本地環境除錯topology很有用, 但是在線上這麼做的話會影響效能的。 topologConf.setDebug(true); //storm的執行有兩種模式: 本地模式和分散式模式. if (args != null && args.length > 0) { //定義你希望叢集分配多少個工作程序給你來執行這個topology topologConf.setNumWorkers(2); //向叢集提交topology StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology()); } else { topologConf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", topologConf, builder.createTopology()); Utils.sleep(10000000); cluster.shutdown(); } } }
6.3、FilterBolt用來過濾日誌資訊
主要是過濾格式和校驗appId是否合法。
public void execute(Tuple input, BasicOutputCollector collector) { //獲取KafkaSpout傳送出來的資料 String line = input.getString(0); //獲取kafka傳送的資料,是一個byte陣列 // byte[] value = (byte[]) input.getValue(0); //將陣列轉化成字串 // String line = new String(value); //對資料進行解析 // appid content //1 error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao Message message = MonitorHandler.parser(line); if (message == null) { return; } if (MonitorHandler.trigger(message)) { collector.emit(new Values(message.getAppId(), message)); } //定時更新規則資訊 MonitorHandler.scheduleLoad(); }
6.4、PrepareRecordBolt傳送郵件告警和簡訊告警
public void execute(Tuple input, BasicOutputCollector collector) { Message message = (Message) input.getValueByField("message"); String appId = input.getStringByField("appId"); //將觸發規則的資訊進行通知 MonitorHandler.notifly(appId, message); Record record = new Record(); try { BeanUtils.copyProperties(record, message); collector.emit(new Values(record)); } catch (Exception e) { } }
6.6、 SaveMessage2MySq儲存到資料庫
public class SaveMessage2MySql extends BaseBasicBolt { private static Logger logger = Logger.getLogger(SaveMessage2MySql.class); public void execute(Tuple input, BasicOutputCollector collector) { Record record = (Record) input.getValueByField("record"); MonitorHandler.save(record); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
6.7、核心類 MonitorHandler 所有流程處理的核心程式碼
public class MonitorHandler { private static Logger logger = Logger.getLogger(MonitorHandler.class); //定義一個map,其中appId為Key,以該appId下的所有rule為Value private static Map<String, List<Rule>> ruleMap; //定義一個map,其中appId為Key,以該appId下的所有user為Value private static Map<String, List<User>> userMap; //定義一個list,用來封裝所有的應用資訊 private static List<App> applist; //定義一個list,用來封裝所有的使用者資訊 private static List<User> userList; //定時載入配置檔案的標識 private static boolean reloaded = false; //定時載入配置檔案的標識 private static long nextReload = 0l; static { load(); } /** * 解析輸入的日誌,將資料按照一定的規則進行分割。 * 判斷日誌是否合法,主要校驗日誌所屬應用的appId是否存在 * * @param line 一條日誌 * @return */ public static Message parser(String line) { //日誌內容分為兩個部分:由5個$$$$$符號作為分隔符,第一部分為appid,第二部分為日誌內容。 String[] messageArr = line.split("\\$\\$\\$\\$\\$"); //對日誌進行校驗 if (messageArr.length != 2) { return null; } if (StringUtils.isBlank(messageArr[0]) || StringUtils.isBlank(messageArr[1])) { return null; } //檢驗當前日誌所屬的appid是否是經過授權的。 if (apppIdisValid(messageArr[0].trim())) { Message message = new Message(); message.setAppId(messageArr[0].trim()); message.setLine(messageArr[1]); return message; } return null; } /** * 驗證appid是否經過授權 */ private static boolean apppIdisValid(String appId) { try { for (App app : applist) { if (app.getId() == Integer.parseInt(appId)) { return true; } } } catch (Exception e) { return false; } return false; } /** * 對日誌進行規制判定,看看是否觸發規則 * @param message * @return */ public static boolean trigger(Message message) { //如果規則模型為空,需要初始化載入規則模型 if (ruleMap == null) { load(); } //從規則模型中獲取當前appid配置的規則 System.out.println(message.getAppId()); List<Rule> keywordByAppIdList = ruleMap.get(message.getAppId()); for (Rule rule : keywordByAppIdList) { //如果日誌中包含過濾過的關鍵詞,即為匹配成功 if (message.getLine().contains(rule.getKeyword())) { message.setRuleId(rule.getId() + ""); message.setKeyword(rule.getKeyword()); return true; } } return false; } /** * 載入資料模型,主要是使用者列表、應用管理表、組合規則模型、組合使用者模型。 */ public static synchronized void load() { if (userList == null) { userList = loadUserList(); } if (applist == null) { applist = loadAppList(); } if (ruleMap == null) { ruleMap = loadRuleMap(); } if (userMap == null) { userMap = loadUserMap(); } } /** * 訪問資料庫獲取所有有效的app列表 * @return */ private static List<App> loadAppList() { return new LogMonitorDao().getAppList(); } /** * 訪問資料庫獲取所有有效使用者的列表 * @return */ private static List<User> loadUserList() { return new LogMonitorDao().getUserList(); } /** * 封裝應用與使用者對應的map * @return */ private static Map<String, List<User>> loadUserMap() { //以應用的appId為key,以應用的所有負責人的userList物件為value。 //HashMap<String, List<User>> HashMap<String, List<User>> map = new HashMap<String, List<User>>(); for (App app : applist) { String userIds = app.getUserId(); List<User> userListInApp = map.get(app.getId()); if (userListInApp == null) { userListInApp = new ArrayList<User>(); map.put(app.getId() + "", userListInApp); } String[] userIdArr = userIds.split(","); for (String userId : userIdArr) { userListInApp.add(queryUserById(userId)); } map.put(app.getId() + "", userListInApp); } return map; } /** * 封裝應用與規則的map * @return */ private static Map<String, List<Rule>> loadRuleMap() { Map<String, List<Rule>> map = new HashMap<String, List<Rule>>(); LogMonitorDao logMonitorDao = new LogMonitorDao(); List<Rule> ruleList = logMonitorDao.getRuleList(); //將代表rule的list轉化成一個map,轉化的邏輯是, // 從rule.getAppId作為map的key,然後將rule物件作為value傳入map //Map<appId,ruleList> 一個appid的規則資訊,儲存在一個list中。 for (Rule rule : ruleList) { List<Rule> ruleListByAppId = map.get(rule.getAppId()+""); if (ruleListByAppId == null) { ruleListByAppId = new ArrayList<Rule>(); map.put(rule.getAppId() + "", ruleListByAppId); } ruleListByAppId.add(rule); map.put(rule.getAppId() + "", ruleListByAppId); } return map; } /** * 通過使用者編號獲取使用者的JavaBean * @param userId * @return */ private static User queryUserById(String userId) { for (User user : userList) { if (user.getId() == Integer.parseInt(userId)) { return user; } } return null; } /** * 通過app編號,獲取當前app的所有負責人列表 * @param appId * @return */ public static List<User> getUserIdsByAppId(String appId) { return userMap.get(appId); } /** * 告警模組,用來發送郵件和簡訊 * 簡訊功能由於簡訊資源匱乏,目前預設返回已傳送。 * @param appId * @param message */ public static void notifly(String appId, Message message) { //通過appId獲取應用負責人的物件 List<User> users = getUserIdsByAppId(appId); //傳送郵件 if (sendMail(appId, users, message)) { message.setIsEmail(1); } //傳送簡訊 if (sendSMS(appId, users, message)) { message.setIsPhone(1); } } /** * 傳送簡訊的模組 * 由於簡訊資源匱乏,目前該功能不開啟,預設true,即簡訊傳送成功。 * 目前傳送簡訊功能使用的是外部介面,外面介面的併發性沒法保證,會影響storm程式執行的效率。 * 後期可以改造為將簡訊資料傳送到外部的訊息隊裡中,然後建立一個worker去傳送簡訊。 * @param appId * @param users * @param message * @return */ private static boolean sendSMS(String appId, List<User> users, Message message) { // return true; List<String> mobileList = new ArrayList<String>(); for (User user : users) { mobileList.add(user.getMobile()); } for (App app : applist) { if (app.getId() == Integer.parseInt(appId.trim())) { message.setAppName(app.getName()); break; } } String content = "系統【" + message.getAppName() + "】在 " + DateUtils.getDateTime() + " 觸發規則 " + message.getRuleId() + ",關鍵字:" + message.getKeyword(); return SMSBase.sendSms(listToStringFormat(mobileList), content); } /** * 傳送郵件 * 後期可以改造為將郵件資料傳送到外部的訊息隊裡中,然後建立一個worker去傳送簡訊。 * @param appId * @param userList * @param message * @return */ private static boolean sendMail(String appId, List<User> userList, Message message) { List<String> receiver = new ArrayList<String>(); for (User user : userList) { receiver.add(user.getEmail()); } for (App app : applist) { if (app.getId() == Integer.parseInt(appId.trim())) { message.setAppName(app.getName()); break; } } if (receiver.size() >= 1) { String date = DateUtils.getDateTime(); String content = "系統【" + message.getAppName() + "】在 " + date + " 觸發規則 " + message.getRuleId() + " ,過濾關鍵字為:" + message.getKeyword() + " 錯誤內容:" + message.getLine(); MailInfo mailInfo = new MailInfo("系統執行日誌監控", content, receiver, null); return MessageSender.sendMail(mailInfo); } return false; } /** * 儲存觸發規則的資訊,將觸發資訊寫入到mysql資料庫中。 * * @param record */ public static void save(Record record) { new LogMonitorDao().saveRecord(record); } /** * 將list轉換為String * @param list * @return */ private static String listToStringFormat(List<String> list) { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < list.size(); i++) { if (i == list.size() - 1) { stringBuilder.append(list.get(i)); } else { stringBuilder.append(list.get(i)).append(","); } } return stringBuilder.toString(); } /** * 配置scheduleLoad重新載入底層資料模型。 */ /** * thread 4 * thread 3 * thread 2 */ public static synchronized void reloadDataModel() { // * thread 1 reloaded = true ----> reloaded = false // * thread 2 reloaded = false // * thread 2 reloaded = false // * thread 2 reloaded = false if (reloaded) { long start = System.currentTimeMillis(); userList = loadUserList(); applist = loadAppList(); ruleMap = loadRuleMap(); userMap = loadUserMap(); reloaded = false; nextReload = 0l; logger.info("配置檔案reload完成,時間:"+DateUtils.getDateTime()+" 耗時:"+ (System.currentTimeMillis()-start)); } } /** * 定時載入配置資訊 * 配合reloadDataModel模組一起使用。 * 主要實現原理如下: * 1,獲取分鐘的資料值,當分鐘資料是10的倍數,就會觸發reloadDataModel方法,簡稱reload時間。 * 2,reloadDataModel方式是執行緒安全的,在當前worker中只有一個執行緒能夠操作。 * 3,為了保證當前執行緒操作完畢之後,其他執行緒不再重複操作,設定了一個識別符號reloaded。 * 在非reload時間段時,reloaded一直被置為true; * 在reload時間段時,第一個執行緒進入reloadDataModel後,載入完畢之後會將reloaded置為false。 */ public static void scheduleLoad() { // String date = DateUtils.getDateTime(); // int now = Integer.parseInt(date.split(":")[1]); // if (now % 10 == 0) {//每10分鐘載入一次 // //1,2,3,4,5,6 // reloadDataModel(); // }else { // reloaded = true; // } if (System.currentTimeMillis()==nextReload){ //thread 1,2,3, reloadDataModel(); } } }
7、 執行結果
郵件傳送