flume監控與監控原理
Flume監控流程
首先在flume-ng-node中org.apache.flume.node.Application的main方法中,有兩個方法分別是startAllComponents()和startAllComponents(conf)方法
其中startAllComponents(conf)方法有一個this.loadMonitoring();來啟動監控方法loadMonitoring()
|
loadMonitoring()方法
|
其中monitorServer.configure(context);來載入監控服務的配置資訊,monitorServer.start();啟動監控服務。
這裡的monitorServer就會有兩種:GangliaServer和HTTPMetricsServer,他們都實現了MonitorService這個介面。這裡我們只追蹤HTTPMetricsServer。
其中會初始化一個jettyServer來提供監控資料的訪問服務,裡面的核心方法還是handle方法,定義了監控資料訪問的url,這裡的url就是獲取監控json格
式資料的http地址。通過原始碼我們可以看到 metricsMap = JMXPollUtil.getAllMBeans();具體的資料都是從這條語句得來的,再仔細看可以得知,這些監
控資料是同JMX的方式得到的。除了以上的原始碼,我們需要關注以外,我們還需要關注具體監控元件的原始碼,這些原始碼都是在flume-ng-core中的org.apache.flume.instrumentation包下面,所有的監控元件都會繼承MonitoredCounterGroup實現xxxCounterMBean介面,MonitoredCounterGroup中定義了一些基本公有的監控屬性,xxxCounterMBean定義了獲取監控元素的方法介面,具體實現還是在監控元件中實現。
監控指標資料的流向圖
http監控
簡介
Flume可以通過HTTP以JSON形式報告metrics,啟用HTTP監控,內部會啟動一個jetty服務,Flume需要配置一個埠,flume預設的埠是41414
這個在http監控元件的原始碼中可以看到
public class HTTPMetricsServer implements MonitorService {
private Server jettyServer;
private int port;
private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class);
public static int DEFAULT_PORT = 41414;
public static String CONFIG_PORT = "port";
|
使用方式
如果僅僅想檢視flume執行時的相關的資料量,則使用http這種監控方式,只需要在啟動flume的時候在啟動引數上面加上監控配置,例如這樣:
#!/bin/sh
nohup /data/server/flume-1.8.0/bin/flume-ng agent -c /data/server/flume-1.8.0/conf -f /data/server/flume-1.8.0/conf/kafka_channel.conf -n a0 -Dflume.monitoring.type=http -Dflume.monitoring.port=5653 -Dflume.root.logger=INFO,console> /data/server/flume-1.8.0/conf/flume-client.log &
|
其中-Dflume.monitoring.type=http表示使用http方式來監控,後面的-Dflume.monitoring.port=1234表示我們需要啟動的監控服務的埠號為5653,這個埠號可以自己隨意配置。然後啟動flume之後,通過http://ip:5653/metrics就可以得到flume的一個json格式的監控資料.
注意1:41414是flume監控的預設埠,在配置啟動引數時可以更改這個引數,按照自己的需求更改這個埠。
json資料網頁顯示
不同的瀏覽器顯示的方式和結果也不一樣,比如360瀏覽器會讓你下載的metrics檔案中有json格式的資料,無法重新整理,體驗感極差。Google Chrome瀏覽器會將該json資料直接顯示到網頁上,字型非常小,也沒有一定的顯示格式。而Firefox瀏覽器會將json處理後再顯示到網頁上,很形象和友好,看相關監控指標資料很直觀,目前是感覺顯示效果最好的瀏覽器。
下面展示不同瀏覽器展示json資料的區別:
Google Chrome瀏覽器
火狐瀏覽器:
對比效果一目瞭然,推薦使用火狐瀏覽器檢視flume監控指標
flume重啟監控指標變化
只要Flume不重啟服務這裡就會一直做增量的變更,因為所有的監控指標其實就是一個累加器,flume執行採集資料,相關監控指標就會一直做加1的操作。這個在原始碼也能看出來。
如果flume服務掛了,重啟後所有的指標就又從0開始計數。
監控指標的各個含義
監控程式的書寫
通過上面的介紹,我們已經知道flume監控原理和流程,flume執行時會啟動一個jetty服務,把相關的監控指標通過HTTP以JSON形式報告metrics,同理我們也可以通過java程式獲取這個json字串,從而得到相關的監控指標。通過邏輯判斷,獲知flume執行的問題,觸發相關條件就以發簡訊或郵件的形式報警,及時解決flume出現的問題。
比如當前的報警條件是:連線不上服務,或者獲取的json字串為空,tailDirsource和kafkaChannel處理的資料量相差超過1000,則進行報警,程式碼如下。
public class myThread extends Thread { private String URL; public myThread(String URL) { this.URL = URL; } @Override public void run() { long timeInterval = 10000; int i = 0; //獲取配置檔案中的ip String[] str = URL.split(":"); String ip = str[0]; Properties prop = getPropInfo.getProp(); while (true) { System.out.println("迴圈呼叫 !!! 時間=" + new Date()); try { String s=""; //傳送 GET 請求,獲取連線 sendGet sendGet = new sendGet(); s = sendGet.sendGet(URL); //獲取flume監控指標的json字串 JSONObject jsonObject = JSON.parseObject(s); //獲取各個監控指標 JSONObject channel = jsonObject.getJSONObject("CHANNEL.c1"); JSONObject source = jsonObject.getJSONObject("SOURCE.r1"); //成功寫入channel且提交的日誌總數量 Object channelCount = channel.get("EventPutSuccessCount"); //目前為止source已經接收到的日誌總數量 Object sourceReceivedCount = source.get("EventReceivedCount"); //成功寫出到channel的日誌總數量 Object sourceAcceptedCount = source.get("EventAcceptedCount"); int intChannelCount = Integer.parseInt(channelCount.toString()); int intSourceCount = Integer.parseInt(sourceAcceptedCount.toString()); int data1 = intSourceCount - intChannelCount; if (s != null && s != "" &&(data1<1000)) { System.out.println("成功寫入channel且提交的日誌總數量: " + channelCount); System.out.println("目前為止source已經接收到的日誌總數量: " + sourceReceivedCount); System.out.println("成功寫出到channel的日誌總數量: " + sourceAcceptedCount); } else { long timestamps = System.currentTimeMillis(); System.out.println(timestamps+"============================================================"); String times = new SimpleDateFormat("").format(new Date(timestamps)); //傳送報警郵件 SendMailFunction.errorSendFunction(times, prop.getProperty("mailContent"), "/data/src/flume_data_monitor/flume.properies"); //傳送報警簡訊 SendMessages sendMessages = new SendMessages(); sendMessages.sendMessage(prop.getProperty("phone1"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent") ); sendMessages.sendMessage(prop.getProperty("phone2"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent")); sendMessages.sendMessage(prop.getProperty("phone3"),"伺服器:"+"【"+ip+"】"+prop.getProperty("messageContent")); System.out.println(times+"============================================================"); //出現異常時的各個指標的資料量 System.out.println("成功寫入channel且提交的日誌總數量: " + channelCount); System.out.println("目前為止source已經接收到的日誌總數量: " + sourceReceivedCount); System.out.println("成功寫出到channel的日誌總數量: " + sourceAcceptedCount); //如果出現異常,等待五分鐘處理時間,五分鐘處理不好繼續傳送警告 Thread.sleep(1000*60*5); i++; } Thread.sleep(timeInterval); } catch (Exception e) { e.printStackTrace(); } //超過三次報警跳出迴圈結束報警 if(i>3){ break; } } }