1. 程式人生 > 實用技巧 >基於Python實現的系統SLA可用性統計

基於Python實現的系統SLA可用性統計

基於Python實現的系統SLA可用性統計

1. 介紹

  SLA是Service Level Agreement的英文縮寫,也叫服務質量協議。根據SRE Google運維解密一書中的定義:

 SLA是服務與使用者之間的一個明確的,或者不明確的協議,描述了在達到或者沒有達到SLO(Service Level Objective)之後的後果。

  SLO通常需要SLI(Service Level Indicator)去描述,系統常見的指標有吞吐量(每秒鐘可處理的請求數量)、響應時間和可用性等,如系統的平均響應時間<500ms;系統可用性要達到99.99%。

  在單體機器的架構中,系統的可用性可以近似等於正常執行時間/(正常執行時間+故障時間)

,但在分散式叢集架構中,該計算方式不再適用,因為系統的可以被反向代理到其它的伺服器上,服務呈現一直都可用。對於分散式系統,一般以請求的成功率計算系統的可用性(即伺服器響應狀態碼<500的)。本文主要介紹一個分散式系統的SLA可用性統計的方法及其詳細實現過程。

2. 所需環境及知識

軟體 用途 知識參考
Elasticsearch叢集 系統日誌儲存中心,SLA中指標結果計算的資料來源 range-on-date, terms-aggregation
MySQL資料庫 Elasticsearch叢集聚合結果的持久化儲存 create-database, create-user,
grant-overview
, aggregate-function
Grafana 使用MySQL作為資料來源最終展示系統可用性 MySQL Datasource
Python3 定時獲取Elasticsearch叢集資料並存儲到MySQL Python Elasticsearch Client, APScheduler-User-Guide, Connector-Python-Example

  知識詳細說明:

  • rang-on-date:Elasticsearch查詢某個時間範圍內的相關日誌資訊;
  • terms-aggregation:Elasticsearch聚合查詢的語法,這裡需要聚合日誌記錄的狀態碼;
  • create-database:建立資料庫語法參考,建立的資料庫儲存Elasticsearch的日誌聚合結果;
  • create-user:建立資料庫使用者語法參考;
  • grant-overview:授權使用者許可權語法參考,我這裡授予使用者擁有資料庫的所有許可權;
  • aggregate-function:SQL聚合查詢語法參考;
  • MySQL Datasource:Grafana配置MySQL資料來源及預設的查詢巨集函式的定義參考;
  • Python Elasticsearch Client:Python連線到Elasticsearch的API,可以使用它查詢Elasticsearch的資訊;
  • APScheduler-User-Guide:Python定時任務框架,這裡需要定時任務查詢Elasticsearch獲取日誌資訊;
  • Connector-Python-Example:Python連線MySQL資料庫的Demo。

3. 總體思路

  在Kibana上,我們可以通過Web頁面查詢檢視Elasticsearch儲存的記錄,也可以使用Kibana的Visualize視覺化Elasticsearch儲存的資料。這裡可以通過Python定時任務查詢Elasticsearch的資料,然後過濾得到有用的資料儲存到MySQL資料庫中,最後藉助Grafana的視覺化圖形Stat查詢展示MySQL的結構化資料得到一個分散式系統的可用性(SA:Service Availability)近似值。SA計算公式1- (響應碼大於等於500的數量 / 請求總數)

4. 詳細實現

4.1 Elasticsearch結構化查詢語言

  在此處,查詢的時間範圍為過去五分鐘到現在的時間,而且還需要聚合伺服器響應狀態碼欄位,所以得到的DSL語句大致為:

"query": {
    "bool": {
          "filter": {
              "range": {
                  "@timestamp": {
                      "gte": "2020-10-16T14:06:10",
                      "lt": "2020-10-16T14:11:10"
                  }
              }
          }
    }
},
"size": 0,
"aggs": {
    "group_by_status": {
        "terms": {
             "field": "http_response_code.keyword"
         }
     }
}

  其中,gte表示時間大於等於timestring1lt表示時間小於timestring2,查詢的時間區間為[timestring1,timestring2);aggs是聚合查詢,聚合的欄位為http_response_code.keyword。我這裡查詢logstash-nginx-*索引,在Kibana上的Dev Tools上查詢大致上可以得到以下結果:

  上圖顯示了聚合得到的狀態碼200的數量為10,如果有其它的狀態碼也會一一統計出來的。

4.2 Python使用Elasticsearch API

  有了Elasticsearch的查詢語句,還需要藉助Elasticsearch的Python Client API才可以在Python程式中查詢到Elasticsearch的資料。

  • 安裝Python Elasticsearch Client模組
pip install elasticsearch
  • 查詢Elasticsearch
from elasticsearch import Elasticsearch

# query ElasticSearch
es = Elasticsearch([{'host': self.elasticsearch_config["host"], 'port': self.elasticsearch_config["port"]}],
                   http_auth=(self.elasticsearch_config['username'], self.elasticsearch_config['password']))
response = es.search(index=self.elasticsearch_config['index'], body=self.elasticsearch_config["query_object"])
# get our needed buckets
buckets = response['aggregations']['group_by_status']['buckets']

  其中的body引數為elasticsearch的查詢語句,在此是封裝成了JSON物件格式。

4.3 資料庫準備

  Python查詢到的Elasticsearch資料結果需要持久化到MySQL資料庫,在此需要建立儲存資料庫、資料庫使用者及資料表。資料庫表設計如下:

  • es_sla表
欄位 資料型別 說明 註釋
id bigint(20) unsinged 記錄ID 主鍵,自動遞增
from_time datetime 開始時間
from_timestamp int unsigned 開始時間的Unix時間戳 與from_time表示的時間一致,只是型別不一樣
to_time datetime 結束時間
to_timestamp int unsigned 結束時間的Unix時間戳 與to_time表示的時間一致,只是型別不一樣
status_code smallint unsigned 狀態碼
count smallint unsigned 對應狀態碼的數量
es_index varchar(100) 對應的查詢Elasticsearch索引
  • SQL語句
create database if not exists elasticsearch CHARACTER SET utf8 COLLATE utf8_general_ci;
create user elastic identified by '123456';
grant all privileges on elasticsearch.* to elastic;
create table es_sla
(
    id bigint(20) unsigned primary key auto_increment,
    from_time datetime,
    from_timestamp int unsigned,
    to_time datetime,
    to_timestamp int unsigned,
    status_code smallint unsigned,
    count smallint unsigned,
    es_index varchar(100)
);

4.4 Python定時任務

  這裡設計的查詢的是每五分鐘就執行查詢Elasticsearch資料操作,python的定時任務使用框架APScheduler。查閱資料,需要先安裝模組,然後定義定時執行的函式,設定執行的觸發器。

  • 安裝模組:
pip install apscheduler
  • 設定每五分鐘執行的定時器程式碼:
from apscheduler.schedulers.blocking import BlockingScheduler

# BlockingScheduler
scheduler = BlockingScheduler()		#例項化定時器,如果要後臺執行,可以使用BackgroundScheduler
scheduler.add_job(es_job.query_es_job, 'interval', seconds=5 * 60)
scheduler.start()

4.5 Python連線資料庫及持久化

  這裡連線的MySQL資料庫使用MySQL官方提供的聯結器Connector,我把它封裝成函式獲取一個數據庫連線物件。

  • 安裝模組
pip install mysql-connector mysql-connector-python
  • 獲取連線物件
import mysql.connector

def get_connection(db_config):
    connection = mysql.connector.connect(user=db_config['username'],
                                         password=db_config['password'],
                                         host=db_config['host'],
                                         database=db_config['database'])
    return connection
  • 插入資料表(持久化)
def insert_list(self, es_sla_list):

    cursor = self.connection.cursor()
    insert_sql = ("insert into es_sla (from_time,from_timestamp,to_time,"
                  "to_timestamp,status_code,count,es_index)"
                  "values (%(from_time)s,%(from_timestamp)s,"
                  "%(to_time)s,%(to_timestamp)s,%(status_code)s,%(count)s,%(es_index)s)")
    row = 0
    for es_sla in es_sla_list:
        print(es_sla['status_code'])
        cursor.execute(insert_sql, es_sla)
        row = row + 1
    if row == len(es_sla_list):
        self.connection.commit()
    else:
        row = 0
    cursor.close()
    return row

4.6 定時任務執行內容

  在前面的4.1-4.5中,我介紹了一些基礎,最終還是要定時任務執行的操作完成具體的Elasticsearch查詢操作和資料持久化操作。定時任務的詳細具體執行過程如下:

  • 具體實現程式碼:
# function which query elasticsearch job
def query_es_job(self):
    # set up query time ranger
    str_from_time = (datetime.utcnow() + timedelta(seconds=-300)).strftime('%Y-%m-%dT%H:%M:%S')
    str_to_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')
    self.elasticsearch_config["query_object"]["query"]["bool"]["filter"]["range"]["@timestamp"]['gte'] \
        = str_from_time
    self.elasticsearch_config["query_object"]["query"]["bool"]["filter"]["range"]["@timestamp"]['lt'] = str_to_time

    # query ElasticSearch
    es = Elasticsearch([{'host': self.elasticsearch_config["host"], 'port': self.elasticsearch_config["port"]}],
                       http_auth=(self.elasticsearch_config['username'], self.elasticsearch_config['password']))
    response = es.search(index=self.elasticsearch_config['index'], body=self.elasticsearch_config["query_object"])
    # get our needed buckets
    buckets = response['aggregations']['group_by_status']['buckets']

    # set up time format for database attributes
    from_time = (datetime.strptime(str_from_time, '%Y-%m-%dT%H:%M:%S') + timedelta(hours=8))\
        .strftime('%Y-%m-%d %H:%M:%S')
    to_time = (datetime.strptime(str_to_time, '%Y-%m-%dT%H:%M:%S') + timedelta(hours=8))\
        .strftime('%Y-%m-%d %H:%M:%S')

    if len(buckets) > 0 and buckets is not None:
        # construct record object list
        es_sla_list = []
        for bucket in buckets:
            es_sla = {
                'from_time': from_time,
                'from_timestamp': int(time.mktime(time.strptime(from_time, '%Y-%m-%d %H:%M:%S'))),
                'to_time': to_time,
                'to_timestamp': int(time.mktime(time.strptime(to_time, '%Y-%m-%d %H:%M:%S'))),
                'status_code': int(bucket['key']),
                'count': bucket['doc_count'],
                'es_index': self.elasticsearch_config['index']
            }
            es_sla_list.append(es_sla)

        # create ES_SLA_DAO object and insert
        es_sla_dao = ES_SLA_DAO(self.database_config)
        row = es_sla_dao.insert_list(es_sla_list)
        if row == len(es_sla_list):
            print_message('execute sql success.')
        else:
            print_message('execute sql fail.')
    else:
        print_message('Buckets length is 0. Noting to do.')

4.7 Grafana展示SA資料

  Grafana展示系統可用性SA是通過配置MySQL資料庫源的,然後在Dashboard面板上新增Stat圖。在我這裡是直接編輯SQL語句查詢出來,使用MySQL記錄es_sla表,以to_time(結束時間)作為Grafana的時間戳,依據公式SA = 1- (響應碼大於等於500的數量 / 請求總數)進行查詢。

  • SQL語句
SELECT
  UNIX_TIMESTAMP(to_time) as time,
  1 - sum(case when status_code >= 500 then count else 0 end) / sum(count) AS "SLA"
FROM es_sla
WHERE
  UNIX_TIMESTAMP(to_time) BETWEEN 1603249298 AND 1603335698
GROUP BY 1
ORDER BY time
  • SQL新增Grafana巨集函式
SELECT
  UNIX_TIMESTAMP(to_time) as time,
  1 - sum(case when status_code >= 500 then count else 0 end) / sum(count) AS "SLA"
FROM es_sla
WHERE
  UNIX_TIMESTAMP(to_time) BETWEEN $__unixEpochFrom() AND $__unixEpochTo()
GROUP BY 1
ORDER BY time

  根據Grafana的參考資料,$__unixEpochFrom()表示當前選擇的開始時間(UNIX_TIMESTAMP),$__unixEpochTo()表示當前選擇的結束時間(UNIX_TIMESTAMP)。還需要調整顯示的stat圖形的單位為0.0~1.0的百分數,保留4位小數,如下圖所示:

5. 資料

  所有的程式碼我上傳到GitHub倉庫:es-sla . 專案檔案結構如下圖所示:

  • 說明:
目錄/檔案 說明
config 配置目錄,放置了ES的配置檔案elasticsearch.conf、MySQL資料庫配置檔案db.conf、建表語句檔案db.sql和待查詢的ES索引儲存檔案index.conf
src 原始碼目錄,包括資料庫的操作dao,定時任務job和一些工具包
main.py 主執行程式,主要是載入配置檔案,建立定時任務執行
.gitignore git忽略檔案的配置檔案
requirement.txt Python程式所需的模組包,可以執行pip install -r requirement.txt安裝