1. 程式人生 > >flume監控與監控原理

flume監控與監控原理

Flume監控流程

首先在flume-ng-node中org.apache.flume.node.Application的main方法中,有兩個方法分別是startAllComponents()和startAllComponents(conf)方法

其中startAllComponents(conf)方法有一個this.loadMonitoring();來啟動監控方法loadMonitoring()

 

loadMonitoring()方法

private void loadMonitoring() {

  Properties systemProps = System.getProperties();

  Set<String> keys = systemProps.stringPropertyNames();

  try {

    if (keys.contains(CONF_MONITOR_CLASS)) {

      String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);

      Class<? extends MonitorService> klass;

      try {

              klass = MonitoringType.valueOf(

            monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();

      } catch (Exception e) {

              klass = (Class<? extends MonitorService>) Class.forName(monitorType);

      }

      this.monitorServer = klass.newInstance();

      Context context = new Context();

      for (String key : keys) {

        if (key.startsWith(CONF_MONITOR_PREFIX)) {

          context.put(key.substring(CONF_MONITOR_PREFIX.length()),

              systemProps.getProperty(key));

        }

      }

          monitorServer.configure(context);

            monitorServer.start();

    }

  } catch (Exception e) {

    logger.warn("Error starting monitoring. "

        + "Monitoring might not be available.", e);

  }



}

 

其中monitorServer.configure(context);來載入監控服務的配置資訊,monitorServer.start();啟動監控服務。
這裡的monitorServer就會有兩種:GangliaServer和HTTPMetricsServer,他們都實現了MonitorService這個介面。這裡我們只追蹤HTTPMetricsServer。

其中會初始化一個jettyServer來提供監控資料的訪問服務,裡面的核心方法還是handle方法,定義了監控資料訪問的url,這裡的url就是獲取監控json格

式資料的http地址。通過原始碼我們可以看到 metricsMap = JMXPollUtil.getAllMBeans();具體的資料都是從這條語句得來的,再仔細看可以得知,這些監

控資料是同JMX的方式得到的。除了以上的原始碼,我們需要關注以外,我們還需要關注具體監控元件的原始碼,這些原始碼都是在flume-ng-core中的org.apache.flume.instrumentation包下面,所有的監控元件都會繼承MonitoredCounterGroup實現xxxCounterMBean介面,MonitoredCounterGroup中定義了一些基本公有的監控屬性,xxxCounterMBean定義了獲取監控元素的方法介面,具體實現還是在監控元件中實現。

監控指標資料的流向圖

http監控

簡介

Flume可以通過HTTP以JSON形式報告metrics,啟用HTTP監控,內部會啟動一個jetty服務,Flume需要配置一個埠,flume預設的埠是41414

這個在http監控元件的原始碼中可以看到


public class HTTPMetricsServer implements MonitorService {



  private Server jettyServer;

  private int port;

  private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class);

  public static int DEFAULT_PORT = 41414;

  public static String CONFIG_PORT = "port";

 

使用方式

如果僅僅想檢視flume執行時的相關的資料量,則使用http這種監控方式,只需要在啟動flume的時候在啟動引數上面加上監控配置,例如這樣:


#!/bin/sh      

nohup /data/server/flume-1.8.0/bin/flume-ng agent -c /data/server/flume-1.8.0/conf  -f /data/server/flume-1.8.0/conf/kafka_channel.conf -n a0 -Dflume.monitoring.type=http -Dflume.monitoring.port=5653  -Dflume.root.logger=INFO,console> /data/server/flume-1.8.0/conf/flume-client.log &

 

其中-Dflume.monitoring.type=http表示使用http方式來監控,後面的-Dflume.monitoring.port=1234表示我們需要啟動的監控服務的埠號為5653,這個埠號可以自己隨意配置。然後啟動flume之後,通過http://ip:5653/metrics就可以得到flume的一個json格式的監控資料.

注意1:41414是flume監控的預設埠,在配置啟動引數時可以更改這個引數,按照自己的需求更改這個埠。

json資料網頁顯示

不同的瀏覽器顯示的方式和結果也不一樣,比如360瀏覽器會讓你下載的metrics檔案中有json格式的資料,無法重新整理,體驗感極差。Google Chrome瀏覽器會將該json資料直接顯示到網頁上,字型非常小,也沒有一定的顯示格式。而Firefox瀏覽器會將json處理後再顯示到網頁上,很形象和友好,看相關監控指標資料很直觀,目前是感覺顯示效果最好的瀏覽器。

下面展示不同瀏覽器展示json資料的區別:

Google Chrome瀏覽器

火狐瀏覽器:

對比效果一目瞭然,推薦使用火狐瀏覽器檢視flume監控指標

flume重啟監控指標變化

只要Flume不重啟服務這裡就會一直做增量的變更,因為所有的監控指標其實就是一個累加器,flume執行採集資料,相關監控指標就會一直做加1的操作。這個在原始碼也能看出來。

如果flume服務掛了,重啟後所有的指標就又從0開始計數。

監控指標的各個含義

監控程式的書寫

通過上面的介紹,我們已經知道flume監控原理和流程,flume執行時會啟動一個jetty服務,把相關的監控指標通過HTTP以JSON形式報告metrics,同理我們也可以通過java程式獲取這個json字串,從而得到相關的監控指標。通過邏輯判斷,獲知flume執行的問題,觸發相關條件就以發簡訊或郵件的形式報警,及時解決flume出現的問題。

比如當前的報警條件是:連線不上服務,或者獲取的json字串為空,tailDirsource和kafkaChannel處理的資料量相差超過1000,則進行報警,程式碼如下。

public class myThread extends Thread {
    private  String URL;
    public myThread(String URL) {
        this.URL = URL;
    }
    @Override
    public void run() {
        long timeInterval = 10000;
        int i = 0;
        //獲取配置檔案中的ip
        String[] str = URL.split(":");
        String ip = str[0];
        Properties prop = getPropInfo.getProp();
        while (true) {
            System.out.println("迴圈呼叫 !!!  時間=" + new Date());
            try {
            String s="";
            //傳送 GET 請求,獲取連線
            sendGet sendGet = new sendGet();
            s = sendGet.sendGet(URL);
            //獲取flume監控指標的json字串
            JSONObject jsonObject = JSON.parseObject(s);
            //獲取各個監控指標
            JSONObject channel = jsonObject.getJSONObject("CHANNEL.c1");
            JSONObject source = jsonObject.getJSONObject("SOURCE.r1");
            //成功寫入channel且提交的日誌總數量
            Object channelCount = channel.get("EventPutSuccessCount");
            //目前為止source已經接收到的日誌總數量
            Object sourceReceivedCount = source.get("EventReceivedCount");
            //成功寫出到channel的日誌總數量
            Object sourceAcceptedCount = source.get("EventAcceptedCount");

            int intChannelCount = Integer.parseInt(channelCount.toString());
            int intSourceCount = Integer.parseInt(sourceAcceptedCount.toString());
            int data1 = intSourceCount - intChannelCount;
            if (s != null && s != ""  &&(data1<1000)) {
                System.out.println("成功寫入channel且提交的日誌總數量: " + channelCount);
                System.out.println("目前為止source已經接收到的日誌總數量: " + sourceReceivedCount);
                System.out.println("成功寫出到channel的日誌總數量: " + sourceAcceptedCount);
            } else {
            long timestamps = System.currentTimeMillis();
            System.out.println(timestamps+"============================================================");
            String times = new SimpleDateFormat("").format(new Date(timestamps));
            //傳送報警郵件
            SendMailFunction.errorSendFunction(times, prop.getProperty("mailContent"), "/data/src/flume_data_monitor/flume.properies");
            //傳送報警簡訊
            SendMessages sendMessages = new SendMessages();
            sendMessages.sendMessage(prop.getProperty("phone1"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent") );
            sendMessages.sendMessage(prop.getProperty("phone2"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent"));
            sendMessages.sendMessage(prop.getProperty("phone3"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent"));
            System.out.println(times+"============================================================");
            //出現異常時的各個指標的資料量
            System.out.println("成功寫入channel且提交的日誌總數量: " + channelCount);
            System.out.println("目前為止source已經接收到的日誌總數量: " + sourceReceivedCount);
            System.out.println("成功寫出到channel的日誌總數量: " + sourceAcceptedCount);
            //如果出現異常,等待五分鐘處理時間,五分鐘處理不好繼續傳送警告
            Thread.sleep(1000*60*5);
            i++;
            }
            Thread.sleep(timeInterval);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //超過三次報警跳出迴圈結束報警
        if(i>3){
            break;
        }
        }
    }