1. 程式人生 > >日誌監控告警系統的設計與實現

日誌監控告警系統的設計與實現

基於的日誌進行監控,監控需要一定規則,對觸發監控規則的日誌資訊進行告警,告警的方式,是簡訊和郵件。

log4j---->error,info,debug 應用程式程式的日誌  error級別 TimeOutException 角標越界IndexXXXException ......Error

com.alibaba.jstorm.daemon.worker.WorkerData]-[INFO] Current worker taskList:[1, 2, 3, 4, 5, 6, 7]

String.contains.(" taskList ")-------------->當訂單量觸發一千萬時,告警通知,讓大家慶祝下。

OrdertotalNum:1000萬

kafaka生成叢集的原理、分割槽

kafka消費者的負載均衡,kfakaSpout

Kafka broker(核心機制,topic,分片,檔案儲存機制)

Redis API學習

spout:從外部資料來源中讀取資料,然後轉換為topology

架構圖:

DataSource:外部資料來源

Spout:接收外部資料來源的元件,將外部資料來源轉化成storm內部的資料,以Tuple為基本的傳輸單元下發給Bolt.

Bolt:接受Spout傳送的資料,或上游的bolt的傳送的資料,根據業務邏輯進行處理,傳送給下一個Bolt或者是儲存到某種介質上,例如Redis。

Tuple:Storm內部中資料傳輸的基本單元,裡面封裝了一個List物件,用來儲存資料。

StreamGroup:資料分鐘策略,7種,shuffleGrouping,Non Grouping,FieldGrouping,Local or ShuffleGrouping.

Nimbus:任務分配

Supervisor:接受任務,並啟動worker,worker的數量是根據埠號來的。

Worker:執行任務的具體元件(JVM),可以執行兩種型別的任務,Spout任務或者bolt任務

Task:一個task屬於一個Spout或者Bolt併發任務。

zk:儲存任務分配的資訊,心跳資訊,元資料資訊。

1、背景知識

一款優秀的軟體需要具備的特點

l 軟體的實用性。

所謂有的放矢,軟體的誕生是為了解決特定的問題,比如現在流行的MVC 框架,早期的沒有MVC 開發的時候,耦合度很大,後期維護更新成本高,難度大,這樣MVC 框架就孕育而生;比如陌陌這種社交軟體,是為了解決陌生人之間交流的問題;比如疼醒這種軟體是為了解決人們遠端溝通的問題;比如OA系統為了解決公司協同流程、專案管理、知識管理等問題……所以一款優秀的軟體必須能夠解決一個領域內的問題。

l 軟體的穩定性。

軟體的實用性問題解決之後,急需要解決的問題就是軟體的穩定性。一般線上系統都會承載企業的某項業務,系統的穩定性直接影響了業務是否能夠正常運營。很多創業公司在前期只注重業務的發展,不太在意系統的穩定性,一旦使用者兩比較大的之後,就會出現很多效能的問題。這種情況就好比,你找了一個妹子,並準備深入交往後結婚,卻發現這個妹子總是有很多異性朋友在聯絡……

l 程式碼的規範性

鐵打的營盤流水的兵,一款優秀的軟體不僅僅是功能的實現。整體架構、功能模組、程式碼註釋、擴充套件性等問題也也需要考慮,畢竟在一個軟體的生命週期過程中,參與的人實在是太多了,主創人員也可能隨時流式。所以程式碼的規範性就難能可貴了。

l 升級保持向前相容性。

如果一個軟體平常使用挺好的,但是升級卻越來越費勁,或者升級後穩定性大打折扣,也難以稱得上一個好的軟體。

l 基本的使用手冊

文件、文件、文件、一個簡單有效的使用手冊,才是程式的王道,知其然才能知其所以然。能讓使用者一目瞭然,功能、架構、設計思路、程式碼等等。

2、需求分析

隨著公司業務發展,支撐公司業務的各種系統越來越多,為了保證公司的業務正常發展,急需要對這些線上系統的執行進行監控,做到問題的及時發現和處理,最大程度減少對業務的影響。

目前系統分類有:

1) 有基於Tomcat的web應用

2) 有獨立的Java Application應用

3) 有執行在linux上的指令碼程式

4) 有大規模的叢集框架(zookeeper、Hadoop、Storm、SRP……)

5) 有作業系統的執行日誌

主要功能需求分為:

監控系統日誌中的內容,按照一定規則進行過濾

發現問題之後通過簡訊和郵件進行告警

3、功能分析

l 資料輸入

使用flume客戶端獲取個系統的資料;

使用者通過頁面輸入系統名稱、負責人觸發規則等資訊

l 資料儲存

使用flume採集資料並存放在kafka叢集中

l 資料計算

使用storm編寫程式對日誌進行過濾,將滿足過濾規則的資訊,通過郵件簡訊告警並儲存到資料庫中

l 資料展示

管理頁面可以檢視觸發規則的資訊,系統負責人,聯絡方式,觸發資訊明細等

4、原型設計

產品經理設計產品原形

5、架構設計

5.1、整體架構設計

主要架構為應用+flume+kafka+storm+mysql+Java web。資料流程如下:

1. 應用程式使用log4j產生日誌

2. 部署flume客戶端監控應用程式產生的日誌資訊,併發送到kafka叢集中

3. storm spout拉去kafka的資料進行消費,逐條過濾每條日誌的進行規則判斷,對符合規則的日誌進行郵件告警。

4. 最後將告警的資訊儲存到mysql資料庫中,用來進行管理。

5.2、Flume設計

l Flume說明

Flume是一個分散式、可靠地、可用的服務,用來收集、聚合、傳輸日誌資料。

它是一個基於流式資料的架構,簡單而靈活。具有健壯性、容錯機制、故障轉移、恢復機制。

它提供一個簡單的可擴充套件的資料模型,容許線上分析程式。F

Flume 作為 cloudera 開發的實時日誌收集系統,受到了業界的認可與廣泛應用。

l Flume 設計摘要

使用 Flume EXEC執行一個linux命令來生成資料來源。例如,可以用tail命令監控一個檔案,那麼,只要檔案增加內容,EXEC就可以將增加的內容作為資料來源傳送出去。

使用 org.apache.flume.plugins.KafkaSink,將Flume EXEC產生的資料來源傳送到Kafka中。

5.3、Kafka設計

l Kafka說明

kafka是一個分散式訊息佇列:生產者、消費者的功能。

l Kakfa設計摘要

部署kafka叢集,在叢集中新增一個Topic:monitor_realtime_javaxy

5.4、Storm設計

l KafkaSpout讀取資料,需要配置Topic:monitor_realtime_javaxy

l FilterBolt判斷規則

l NotifyBolt用來發送郵件或簡訊息

l Save2DB用來將告警資訊寫入mysql資料庫

5.5、 資料模型設計

5.5.1、使用者表

用來儲存使用者的資訊,包括賬號、手機號碼、郵箱、是否有效等資訊

5.5.2、應用表

用來儲存應用的資訊,包括應用名稱、應用描述、應用是否線上等資訊

5.5.3、應用型別表

用來儲存應用的型別等資訊

5.5.4、規則表

用來儲存規則的資訊,包括規則名稱,規則描述,規則關鍵詞等資訊

5.5.5、規則記錄表

用來儲存觸發規則後的記錄,包括告警編號、是否簡訊告知、是否郵件告知、告警明細等資訊。

6、 程式碼開發

6.1、 整體結構

 

6.2、 LogMonitorTopologyMain驅動類

public class LogMonitorTopologyMain {
    private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class);

    public static void main(String[] args) throws  Exception{
        // 使用TopologyBuilder進行構建驅動類
        TopologyBuilder builder = new TopologyBuilder();

//         設定kafka的zookeeper叢集
//        BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181");
////        // 初始化配置資訊
//        SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor");
        // 在topology中設定spout
//        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3);
        builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
        builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout");
        builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId"));
        builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt");

        //啟動topology的配置資訊
        Config topologConf = new Config();
        //TOPOLOGY_DEBUG(setDebug), 當它被設定成true的話, storm會記錄下每個元件所發射的每條訊息。
        //這在本地環境除錯topology很有用, 但是在線上這麼做的話會影響效能的。
        topologConf.setDebug(true);
        //storm的執行有兩種模式: 本地模式和分散式模式.
        if (args != null && args.length > 0) {
            //定義你希望叢集分配多少個工作程序給你來執行這個topology
            topologConf.setNumWorkers(2);
            //向叢集提交topology
            StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology());
        } else {
            topologConf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", topologConf, builder.createTopology());
         Utils.sleep(10000000);
         cluster.shutdown();
        }
    }
}

6.3、FilterBolt用來過濾日誌資訊

主要是過濾格式和校驗appId是否合法。

public void execute(Tuple input, BasicOutputCollector collector) {
        //獲取KafkaSpout傳送出來的資料
        String line = input.getString(0);
        //獲取kafka傳送的資料,是一個byte陣列
//        byte[] value = (byte[]) input.getValue(0);
        //將陣列轉化成字串
//        String line = new String(value);
        //對資料進行解析
        // appid   content
        //1  error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
        Message message = MonitorHandler.parser(line);
        if (message == null) {
            return;
        }
        if (MonitorHandler.trigger(message)) {
            collector.emit(new Values(message.getAppId(), message));
        }
        //定時更新規則資訊
        MonitorHandler.scheduleLoad();
    }

6.4、PrepareRecordBolt傳送郵件告警和簡訊告警

public void execute(Tuple input, BasicOutputCollector collector) {
    Message message = (Message) input.getValueByField("message");
    String appId = input.getStringByField("appId");
    //將觸發規則的資訊進行通知
    MonitorHandler.notifly(appId, message);
    Record record = new Record();
    try {
        BeanUtils.copyProperties(record, message);
        collector.emit(new Values(record));
    } catch (Exception e) {

    }
}

6.6、 SaveMessage2MySq儲存到資料庫

public class SaveMessage2MySql extends BaseBasicBolt {
    private static Logger logger = Logger.getLogger(SaveMessage2MySql.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        Record record = (Record) input.getValueByField("record");
        MonitorHandler.save(record);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

6.7、核心類 MonitorHandler  所有流程處理的核心程式碼

public class MonitorHandler {

    private static Logger logger = Logger.getLogger(MonitorHandler.class);
    //定義一個map,其中appId為Key,以該appId下的所有rule為Value
    private static Map<String, List<Rule>> ruleMap;
    //定義一個map,其中appId為Key,以該appId下的所有user為Value
    private static Map<String, List<User>> userMap;
    //定義一個list,用來封裝所有的應用資訊
    private static List<App> applist;
    //定義一個list,用來封裝所有的使用者資訊
    private static List<User> userList;
    //定時載入配置檔案的標識
    private static boolean reloaded = false;
    //定時載入配置檔案的標識
    private static long nextReload = 0l;

    static {
        load();
    }

    /**
     * 解析輸入的日誌,將資料按照一定的規則進行分割。
     * 判斷日誌是否合法,主要校驗日誌所屬應用的appId是否存在
     *
     * @param line 一條日誌
     * @return
*/
public static Message parser(String line) {
        //日誌內容分為兩個部分:由5個$$$$$符號作為分隔符,第一部分為appid,第二部分為日誌內容。
        String[] messageArr = line.split("\\$\\$\\$\\$\\$");
        //對日誌進行校驗
        if (messageArr.length != 2) {
            return null;
        }
        if (StringUtils.isBlank(messageArr[0]) || StringUtils.isBlank(messageArr[1])) {
            return null;
        }
        //檢驗當前日誌所屬的appid是否是經過授權的。
        if (apppIdisValid(messageArr[0].trim())) {
            Message message = new Message();
            message.setAppId(messageArr[0].trim());
            message.setLine(messageArr[1]);
            return message;
        }
        return null;
    }

    /**
     * 驗證appid是否經過授權
     */
private static boolean apppIdisValid(String appId) {
        try {
            for (App app : applist) {
                if (app.getId() == Integer.parseInt(appId)) {
                    return true;
                }
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

    /**
     * 對日誌進行規制判定,看看是否觸發規則
     * @param message
* @return
*/
public static boolean trigger(Message message) {
        //如果規則模型為空,需要初始化載入規則模型
        if (ruleMap == null) {
            load();
        }
        //從規則模型中獲取當前appid配置的規則
        System.out.println(message.getAppId());
        List<Rule> keywordByAppIdList = ruleMap.get(message.getAppId());
        for (Rule rule : keywordByAppIdList) {
            //如果日誌中包含過濾過的關鍵詞,即為匹配成功
            if (message.getLine().contains(rule.getKeyword())) {
                message.setRuleId(rule.getId() + "");
                message.setKeyword(rule.getKeyword());
                return true;
            }
        }
        return false;
    }

    /**
     * 載入資料模型,主要是使用者列表、應用管理表、組合規則模型、組合使用者模型。
     */
public static synchronized void load() {
        if (userList == null) {
            userList = loadUserList();
        }
        if (applist == null) {
            applist = loadAppList();
        }
        if (ruleMap == null) {
            ruleMap = loadRuleMap();
        }
        if (userMap == null) {
            userMap = loadUserMap();
        }
    }

    /**
     * 訪問資料庫獲取所有有效的app列表
     * @return
*/
private static List<App> loadAppList() {
        return new LogMonitorDao().getAppList();
    }

    /**
     * 訪問資料庫獲取所有有效使用者的列表
     * @return
*/
private static List<User> loadUserList() {
        return new LogMonitorDao().getUserList();
    }

    /**
     * 封裝應用與使用者對應的map
     * @return
*/
private static Map<String, List<User>> loadUserMap() {
        //以應用的appId為key,以應用的所有負責人的userList物件為value。
        //HashMap<String, List<User>>
        HashMap<String, List<User>> map = new HashMap<String, List<User>>();
        for (App app : applist) {
            String userIds = app.getUserId();
            List<User> userListInApp = map.get(app.getId());
            if (userListInApp == null) {
                userListInApp = new ArrayList<User>();
                map.put(app.getId() + "", userListInApp);
            }
            String[] userIdArr = userIds.split(",");
            for (String userId : userIdArr) {
                userListInApp.add(queryUserById(userId));
            }
            map.put(app.getId() + "", userListInApp);
        }
        return map;
    }

    /**
     *  封裝應用與規則的map
     * @return
*/
private static Map<String, List<Rule>> loadRuleMap() {
        Map<String, List<Rule>> map = new HashMap<String, List<Rule>>();
        LogMonitorDao logMonitorDao = new LogMonitorDao();
        List<Rule> ruleList = logMonitorDao.getRuleList();
        //將代表rule的list轉化成一個map,轉化的邏輯是,
        // 從rule.getAppId作為map的key,然後將rule物件作為value傳入map
        //Map<appId,ruleList>  一個appid的規則資訊,儲存在一個list中。
        for (Rule rule : ruleList) {
            List<Rule> ruleListByAppId = map.get(rule.getAppId()+"");
            if (ruleListByAppId == null) {
                ruleListByAppId = new ArrayList<Rule>();
                map.put(rule.getAppId() + "", ruleListByAppId);
            }
            ruleListByAppId.add(rule);
            map.put(rule.getAppId() + "", ruleListByAppId);
        }
        return map;
    }

    /**
     * 通過使用者編號獲取使用者的JavaBean
     * @param userId
* @return
*/
private static User queryUserById(String userId) {
        for (User user : userList) {
            if (user.getId() == Integer.parseInt(userId)) {
                return user;
            }
        }
        return null;
    }

    /**
     * 通過app編號,獲取當前app的所有負責人列表
     * @param appId
* @return
*/
public static List<User> getUserIdsByAppId(String appId) {
        return userMap.get(appId);
    }

    /**
     * 告警模組,用來發送郵件和簡訊
     * 簡訊功能由於簡訊資源匱乏,目前預設返回已傳送。
     * @param appId
* @param message
*/
public static void notifly(String appId, Message message) {
        //通過appId獲取應用負責人的物件
        List<User> users = getUserIdsByAppId(appId);
        //傳送郵件
        if (sendMail(appId, users, message)) {
            message.setIsEmail(1);
        }
        //傳送簡訊
        if (sendSMS(appId, users, message)) {
            message.setIsPhone(1);
        }
    }

    /**
     * 傳送簡訊的模組
     * 由於簡訊資源匱乏,目前該功能不開啟,預設true,即簡訊傳送成功。
     * 目前傳送簡訊功能使用的是外部介面,外面介面的併發性沒法保證,會影響storm程式執行的效率。
     *  後期可以改造為將簡訊資料傳送到外部的訊息隊裡中,然後建立一個worker去傳送簡訊。
     * @param appId
* @param users
* @param message
* @return
*/
private static boolean sendSMS(String appId, List<User> users, Message message) {
//        return true;
        List<String> mobileList = new ArrayList<String>();
        for (User user : users) {
            mobileList.add(user.getMobile());
        }
        for (App app : applist) {
            if (app.getId() == Integer.parseInt(appId.trim())) {
                message.setAppName(app.getName());
                break;
            }
        }
        String content = "系統【" + message.getAppName() + "】在 " + DateUtils.getDateTime() + " 觸發規則 " + message.getRuleId() + ",關鍵字:" + message.getKeyword();
        return SMSBase.sendSms(listToStringFormat(mobileList), content);
    }

    /**
     * 傳送郵件
     * 後期可以改造為將郵件資料傳送到外部的訊息隊裡中,然後建立一個worker去傳送簡訊。
     * @param appId
* @param userList
* @param message
* @return
*/
private static boolean sendMail(String appId, List<User> userList, Message message) {
        List<String> receiver = new ArrayList<String>();
        for (User user : userList) {
            receiver.add(user.getEmail());
        }
        for (App app : applist) {
            if (app.getId() == Integer.parseInt(appId.trim())) {
                message.setAppName(app.getName());
                break;
            }
        }
        if (receiver.size() >= 1) {
            String date = DateUtils.getDateTime();
            String content = "系統【" + message.getAppName() + "】在 " + date + " 觸發規則 " + message.getRuleId() + " ,過濾關鍵字為:" + message.getKeyword() + "  錯誤內容:" + message.getLine();
            MailInfo mailInfo = new MailInfo("系統執行日誌監控", content, receiver, null);
            return MessageSender.sendMail(mailInfo);
        }
        return false;
    }

    /**
     * 儲存觸發規則的資訊,將觸發資訊寫入到mysql資料庫中。
     *
     * @param record
*/
public static void save(Record record) {
        new LogMonitorDao().saveRecord(record);
    }

    /**
     * 將list轉換為String
     * @param list
* @return
*/
private static String listToStringFormat(List<String> list) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < list.size(); i++) {
            if (i == list.size() - 1) {
                stringBuilder.append(list.get(i));
            } else {
                stringBuilder.append(list.get(i)).append(",");
            }
        }
        return stringBuilder.toString();
    }

    /**
     * 配置scheduleLoad重新載入底層資料模型。
     */
    /**
     * thread 4
     * thread 3
     * thread 2
     */
public static synchronized void reloadDataModel() {
//        * thread 1  reloaded = true   ----> reloaded = false
//        * thread 2  reloaded = false
//        * thread 2  reloaded = false
 //        * thread 2  reloaded = false
        if (reloaded) {
            long start = System.currentTimeMillis();
            userList = loadUserList();
            applist = loadAppList();
            ruleMap = loadRuleMap();
            userMap = loadUserMap();
            reloaded = false;
            nextReload = 0l;
            logger.info("配置檔案reload完成,時間:"+DateUtils.getDateTime()+" 耗時:"+ (System.currentTimeMillis()-start));
        }


    }

    /**
     * 定時載入配置資訊
     * 配合reloadDataModel模組一起使用。
     * 主要實現原理如下:
     * 1,獲取分鐘的資料值,當分鐘資料是10的倍數,就會觸發reloadDataModel方法,簡稱reload時間。
     * 2,reloadDataModel方式是執行緒安全的,在當前worker中只有一個執行緒能夠操作。
     * 3,為了保證當前執行緒操作完畢之後,其他執行緒不再重複操作,設定了一個識別符號reloaded。
     *      在非reload時間段時,reloaded一直被置為true;
     *      在reload時間段時,第一個執行緒進入reloadDataModel後,載入完畢之後會將reloaded置為false。
     */
public static void scheduleLoad() {
//        String date = DateUtils.getDateTime();
//        int now = Integer.parseInt(date.split(":")[1]);
//        if (now % 10 == 0) {//每10分鐘載入一次
//            //1,2,3,4,5,6
//            reloadDataModel();
//        }else {
//            reloaded = true;
//        }

        if (System.currentTimeMillis()==nextReload){
            //thread 1,2,3,
            reloadDataModel();
        }


    }
}

7、 執行結果

郵件傳送