1. 程式人生 > >ganglia監控自定義metric實踐

ganglia監控自定義metric實踐

Ganglia監控系統是UC Berkeley開源的一個專案,設計初衷就是要做好分散式叢集的監控,監控層面包括資源層面和業務層面,資源層面包括cpu、memory、disk、IO、網路負載等,至於業務層面由於使用者可以很方便的增加自定義的metric,因此可以用於做諸如服務效能、負載、出錯率等的監控,例如某web服務的QPS、Http status錯誤率。此外,如果和Nagios整合起來還可以在某指標超過一定閾值時觸發相應的報警。

Ganglia相比zabbix的優勢在於客戶端收集agent(gmond)所帶來的系統開銷非常低,不會影響相關服務的效能。

ganglia主要有幾個模組:

  • gmond: 部署在各個被監控機器上,用於定期將資料收集起來,進行廣播或者單播。
  • gmetad:部署在server端,定時從配置的data_source中的host去拉取gmond收集好的資料
  • ganglia-web:將監控資料投遞到web頁面

關於ganglia的安裝本文不做過多介紹,傳送門:http://www.it165.net/admin/html/201302/770.html 

本文主要介紹一下如何開發自定義的metric,方便監控自己關心的指標。

主要有幾大類的方法:

1. 直接使用gmetric

安裝gmond的機器,會同時安裝上/usr/bin/gmetric,該命令是將一個metric的name value等資訊進行廣播的工具,例如  

/usr/bin/gmetric -c /etc/ganglia/gmond.conf --name=test --type=int32 --units=sec --value=2    

此外,除了直接命令列使用gmetric外,還可以使用常見語言的binding,例如go、Java、python等,github上都有相關的binding可以使用,只需要import進來即可。 go語言   https://github.com/ganglia/ganglia_contrib/tree/master/ganglia-go

2. 使用基於gmetric的第三方工具

該工具基於logtail(debain)/logcheck(centos) package, 實現對日誌的定時tail,然後通過指定classname來使用相應的類進行日誌的分析,

根據自己關注的欄位統計出自定義metric,並由gmetric廣播出來。

 例如我們根據自己服務的nginx日誌格式,修改NginxLogtailer.py如下:


# -*- coding: utf-8 -*-
###
###  This plugin for logtailer will crunch nginx logs and produce these metrics:
###    * hits per second
###    * GETs per second
###    * average query processing time
###    * ninetieth percentile query processing time
###    * number of HTTP 200, 300, 400, and 500 responses per second
###
###  Note that this plugin depends on a certain nginx log format, documented in
##   __init__.
import time
import threading
import re
# local dependencies
from ganglia_logtailer_helper import GangliaMetricObject
from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
class NginxLogtailer(object):
    # only used in daemon mode
    period = 30
    def __init__(self):
        '''This function should initialize any data structures or variables
        needed for the internal state of the line parser.'''
        self.reset_state()
        self.lock = threading.RLock()
        # this is what will match the nginx lines
        #log_format ganglia-logtailer
        #    '$host '
        #    '$server_addr '
        #    '$remote_addr '
        #    '- '
        #    '"$time_iso8601" '
        #    '$status '
        #    '$body_bytes_sent '
        #    '$request_time '
        #    '"$http_referer" '
        #    '"$request" '
        #    '"$http_user_agent" '
        #    '$pid';
        # NOTE: nginx 0.7 doesn't support $time_iso8601, use $time_local
        # instead
        # original apache log format string:
        # %v %A %a %u %{%Y-%m-%dT%H:%M:%S}t %c %s %>s %B %D \"%{Referer}i\" \"%r\" \"%{User-Agent}i\" %P
        # host.com 127.0.0.1 127.0.0.1 - "2008-05-08T07:34:44" - 200 200 371 103918 - "-" "GET /path HTTP/1.0" "-" 23794
        # match keys: server_name, local_ip, remote_ip, date, status, size,
        #               req_time, referrer, request, user_agent, pid
        self.reg = re.compile('^(?P<remote_ip>[^ ]+) (?P<server_name>[^ ]+) (?P<hit>[^ ]+) \[(?P<date>[^\]]+)\] "(?P<request>[^"]+)" (?P<status>[^ ]+) (?P<size>[^ ]+) "(?P<referrer>[^"]+)" "(?P<user_agent>[^"]+)" "(?P<forward_to>[^"]+)" "(?P<req_time>[^"]+)"')
        # assume we're in daemon mode unless set_check_duration gets called
        self.dur_override = False

    # example function for parse line
    # takes one argument (text) line to be parsed
    # returns nothing
    def parse_line(self, line):
        '''This function should digest the contents of one line at a time,
        updating the internal state variables.'''
        self.lock.acquire()
        try:
            regMatch = self.reg.match(line)
            if regMatch:
                linebits = regMatch.groupdict()
                if '-' == linebits['request'] or 'file2get' in linebits['request']:
                    self.lock.release()
                    return
                self.num_hits+=1
                # capture GETs
                if( 'GET' in linebits['request'] ):
                    self.num_gets+=1
                # capture HTTP response code
                rescode = float(linebits['status'])
                if( (rescode >= 200) and (rescode < 300) ):
                    self.num_two+=1
                elif( (rescode >= 300) and (rescode < 400) ):
                    self.num_three+=1
                elif( (rescode >= 400) and (rescode < 500) ):
                    self.num_four+=1
                elif( (rescode >= 500) and (rescode < 600) ):
                    self.num_five+=1
                # capture request duration
                dur = float(linebits['req_time'])
                self.req_time += dur
                # store for 90th % calculation
                self.ninetieth.append(dur)
            else:
                raise LogtailerParsingException, "regmatch failed to match"
        except Exception, e:
            self.lock.release()
            raise LogtailerParsingException, "regmatch or contents failed with %s" % e
        self.lock.release()
    # example function for deep copy
    # takes no arguments
    # returns one object
    def deep_copy(self):
        '''This function should return a copy of the data structure used to
        maintain state.  This copy should different from the object that is
        currently being modified so that the other thread can deal with it
        without fear of it changing out from under it.  The format of this
        object is internal to the plugin.'''
        myret = dict( num_hits=self.num_hits,
                    num_gets=self.num_gets,
                    req_time=self.req_time,
                    num_two=self.num_two,
                    num_three=self.num_three,
                    num_four=self.num_four,
                    num_five=self.num_five,
                    ninetieth=self.ninetieth
                    )
        return myret
    # example function for reset_state
    # takes no arguments
    # returns nothing
    def reset_state(self):
        '''This function resets the internal data structure to 0 (saving
        whatever state it needs).  This function should be called
        immediately after deep copy with a lock in place so the internal
        data structures can't be modified in between the two calls.  If the
        time between calls to get_state is necessary to calculate metrics,
        reset_state should store now() each time it's called, and get_state
        will use the time since that now() to do its calculations'''
        self.num_hits = 0
        self.num_gets = 0
        self.req_time = 0
        self.num_two = 0
        self.num_three = 0
        self.num_four = 0
        self.num_five = 0
        self.ninetieth = list()
        self.last_reset_time = time.time()
    # example for keeping track of runtimes
    # takes no arguments
    # returns float number of seconds for this run
    def set_check_duration(self, dur):
        '''This function only used if logtailer is in cron mode.  If it is
        invoked, get_check_duration should use this value instead of calculating
        it.'''
        self.duration = dur
        self.dur_override = True
    def get_check_duration(self):
        '''This function should return the time since the last check.  If called
        from cron mode, this must be set using set_check_duration().  If in
        daemon mode, it should be calculated internally.'''
        if( self.dur_override ):
            duration = self.duration
        else:
            cur_time = time.time()
            duration = cur_time - self.last_reset_time
            # the duration should be within 10% of period
            acceptable_duration_min = self.period - (self.period / 10.0)
            acceptable_duration_max = self.period + (self.period / 10.0)
            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
        return duration
    # example function for get_state
    # takes no arguments
    # returns a dictionary of (metric => metric_object) pairs
    def get_state(self):
        '''This function should acquire a lock, call deep copy, get the
        current time if necessary, call reset_state, then do its
        calculations.  It should return a list of metric objects.'''
        # get the data to work with
        self.lock.acquire()
        try:
            mydata = self.deep_copy()
            check_time = self.get_check_duration()
            self.reset_state()
            self.lock.release()
        except LogtailerStateException, e:
            # if something went wrong with deep_copy or the duration, reset and continue
            self.reset_state()
            self.lock.release()
            raise e
        # crunch data to how you want to report it
        hits_per_second = mydata['num_hits'] / check_time
        gets_per_second = mydata['num_gets'] / check_time
        if (mydata['num_hits'] != 0):
             avg_req_time = mydata['req_time'] / mydata['num_hits']
        else:
             avg_req_time = 0
        two_per_second = mydata['num_two'] / check_time
        three_per_second = mydata['num_three'] / check_time
        four_per_second = mydata['num_four'] / check_time
        five_per_second = mydata['num_five'] / check_time
        # calculate 90th % request time
        ninetieth_list = mydata['ninetieth']
        ninetieth_list.sort()
        num_entries = len(ninetieth_list)
        if (num_entries != 0 ):
             ninetieth_element = ninetieth_list[int(num_entries * 0.9)]
        else:
             ninetieth_element = 0
        # package up the data you want to submit
        hps_metric = GangliaMetricObject('nginx_hits', hits_per_second, units='hps')
        gps_metric = GangliaMetricObject('nginx_gets', gets_per_second, units='hps')
        avgdur_metric = GangliaMetricObject('nginx_avg_dur', avg_req_time, units='sec')
        ninetieth_metric = GangliaMetricObject('nginx_90th_dur', ninetieth_element, units='sec')
        twops_metric = GangliaMetricObject('nginx_200', two_per_second, units='hps')
        threeps_metric = GangliaMetricObject('nginx_300', three_per_second, units='hps')
        fourps_metric = GangliaMetricObject('nginx_400', four_per_second, units='hps')
        fiveps_metric = GangliaMetricObject('nginx_500', five_per_second, units='hps')
        # return a list of metric objects
        return [ hps_metric, gps_metric, avgdur_metric, ninetieth_metric, twops_metric, threeps_metric, fourps_metric, fiveps_metric, ]

在被監控機器上部署ganglia-logtailer後,使用如下命令建立crond任務

*/1 * * * * root   /usr/local/bin/ganglia-logtailer --classname NginxLogtailer --log_file /usr/local/nginx-video/logs/access.log  --mode cron --gmetric_options '-C test_cluster -g nginx_status'

reload crond service,過一分鐘後,在ganglia web上即可看到相應的metric資訊:


3. 用支援的語言編寫自己的module,本文以Python為例

ganglia支援使用者編寫自己的Python module,以下為github上簡要介紹:
Writing a Python module is very simple. You just need to write it following a template and put the resulting Python module (.py) in /usr/lib(64)/ganglia/python_modules. 
A corresponding Python Configuration (.pyconf) file needs to reside in /etc/ganglia/conf.d/.
例如,編寫一個檢查機器溫度的示例Python檔案

acpi_file = "/proc/acpi/thermal_zone/THRM/temperature"

def temp_handler(name):  
    try:
        f = open(acpi_file, 'r')

    except IOError:
        return 0

    for l in f:
        line = l.split()

    return int(line[1])

def metric_init(params):
    global descriptors, acpi_file

    if 'acpi_file' in params:
        acpi_file = params['acpi_file']

    d1 = {'name': 'temp',
        'call_back': temp_handler,
        'time_max': 90,
        'value_type': 'uint',
        'units': 'C',
        'slope': 'both',
        'format': '%u',
        'description': 'Temperature of host',
        'groups': 'health'}

    descriptors = [d1]

    return descriptors

def metric_cleanup():
    '''Clean up the metric module.'''
    pass

#This code is for debugging and unit testing
if __name__ == '__main__':
    metric_init({})
    for d in descriptors:
        v = d['call_back'](d['name'])
        print 'value for %s is %u' % (d['name'],  v)

有了module功能檔案,還需要編寫一個對應的配置檔案(放在/etc/ganglia/conf.d/temp.pyconf下),格式如下:
modules {
  module {
    name = "temp"
    language = "python"
    # The following params are examples only
    #  They are not actually used by the temp module
    param RandomMax {
      value = 600
    }
    param ConstantValue {
      value = 112
    }
  }
}

collection_group {
  collect_every = 10
  time_threshold = 50
  metric {
    name = "temp"
    title = "Temperature"
    value_threshold = 70
  }
}

有了這兩個檔案,這個module就算新增成功了。

其中包括elasticsearch、filecheck、nginx_status、MySQL等常見服務的監控metric對應的module,非常有用,只需要稍作修改,即可滿足自己的需求。

其他的一些比較實用的使用者貢獻的工具

如有問題,歡迎留言討論。