基於Python實現的系統SLA可用性統計
基於Python實現的系統SLA可用性統計
1. 介紹
SLA是Service Level Agreement
的英文縮寫,也叫服務質量協議。根據SRE Google運維解密一書中的定義:
SLA是服務與使用者之間的一個明確的,或者不明確的協議,描述了在達到或者沒有達到SLO(
Service Level Objective
)之後的後果。
SLO通常需要SLI(Service Level Indicator
)去描述,系統常見的指標有吞吐量(每秒鐘可處理的請求數量)、響應時間和可用性等,如系統的平均響應時間<500ms;系統可用性要達到99.99%。
在單體機器的架構中,系統的可用性可以近似等於正常執行時間/(正常執行時間+故障時間)
2. 所需環境及知識
軟體 | 用途 | 知識參考 |
---|---|---|
Elasticsearch叢集 | 系統日誌儲存中心,SLA中指標結果計算的資料來源 | range-on-date, terms-aggregation |
MySQL資料庫 | Elasticsearch叢集聚合結果的持久化儲存 | create-database, create-user, |
Grafana | 使用MySQL作為資料來源最終展示系統可用性 | MySQL Datasource |
Python3 | 定時獲取Elasticsearch叢集資料並存儲到MySQL | Python Elasticsearch Client, APScheduler-User-Guide, Connector-Python-Example |
知識詳細說明:
- rang-on-date:Elasticsearch查詢某個時間範圍內的相關日誌資訊;
- terms-aggregation:Elasticsearch聚合查詢的語法,這裡需要聚合日誌記錄的狀態碼;
- create-database:建立資料庫語法參考,建立的資料庫儲存Elasticsearch的日誌聚合結果;
- create-user:建立資料庫使用者語法參考;
- grant-overview:授權使用者許可權語法參考,我這裡授予使用者擁有資料庫的所有許可權;
- aggregate-function:SQL聚合查詢語法參考;
- MySQL Datasource:Grafana配置MySQL資料來源及預設的查詢巨集函式的定義參考;
- Python Elasticsearch Client:Python連線到Elasticsearch的API,可以使用它查詢Elasticsearch的資訊;
- APScheduler-User-Guide:Python定時任務框架,這裡需要定時任務查詢Elasticsearch獲取日誌資訊;
- Connector-Python-Example:Python連線MySQL資料庫的Demo。
3. 總體思路
在Kibana上,我們可以通過Web頁面查詢檢視Elasticsearch儲存的記錄,也可以使用Kibana的Visualize視覺化Elasticsearch儲存的資料。這裡可以通過Python定時任務查詢Elasticsearch的資料,然後過濾得到有用的資料儲存到MySQL資料庫中,最後藉助Grafana的視覺化圖形Stat
查詢展示MySQL的結構化資料得到一個分散式系統的可用性(SA:Service Availability)近似值。SA計算公式:1- (響應碼大於等於500的數量 / 請求總數)
4. 詳細實現
4.1 Elasticsearch結構化查詢語言
在此處,查詢的時間範圍為過去五分鐘到現在的時間,而且還需要聚合伺服器響應狀態碼欄位,所以得到的DSL語句大致為:
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "2020-10-16T14:06:10",
"lt": "2020-10-16T14:11:10"
}
}
}
}
},
"size": 0,
"aggs": {
"group_by_status": {
"terms": {
"field": "http_response_code.keyword"
}
}
}
其中,gte
表示時間大於等於timestring1
,lt
表示時間小於timestring2
,查詢的時間區間為[timestring1,timestring2);aggs
是聚合查詢,聚合的欄位為http_response_code.keyword
。我這裡查詢logstash-nginx-*
索引,在Kibana
上的Dev Tools
上查詢大致上可以得到以下結果:
上圖顯示了聚合得到的狀態碼200的數量為10,如果有其它的狀態碼也會一一統計出來的。
4.2 Python使用Elasticsearch API
有了Elasticsearch的查詢語句,還需要藉助Elasticsearch的Python Client API才可以在Python程式中查詢到Elasticsearch的資料。
- 安裝Python Elasticsearch Client模組
pip install elasticsearch
- 查詢Elasticsearch
from elasticsearch import Elasticsearch
# query ElasticSearch
es = Elasticsearch([{'host': self.elasticsearch_config["host"], 'port': self.elasticsearch_config["port"]}],
http_auth=(self.elasticsearch_config['username'], self.elasticsearch_config['password']))
response = es.search(index=self.elasticsearch_config['index'], body=self.elasticsearch_config["query_object"])
# get our needed buckets
buckets = response['aggregations']['group_by_status']['buckets']
其中的body引數為elasticsearch的查詢語句,在此是封裝成了JSON物件格式。
4.3 資料庫準備
Python查詢到的Elasticsearch資料結果需要持久化到MySQL資料庫,在此需要建立儲存資料庫、資料庫使用者及資料表。資料庫表設計如下:
- es_sla表
欄位 | 資料型別 | 說明 | 註釋 |
---|---|---|---|
id | bigint(20) unsinged | 記錄ID | 主鍵,自動遞增 |
from_time | datetime | 開始時間 | |
from_timestamp | int unsigned | 開始時間的Unix時間戳 | 與from_time表示的時間一致,只是型別不一樣 |
to_time | datetime | 結束時間 | |
to_timestamp | int unsigned | 結束時間的Unix時間戳 | 與to_time表示的時間一致,只是型別不一樣 |
status_code | smallint unsigned | 狀態碼 | |
count | smallint unsigned | 對應狀態碼的數量 | |
es_index | varchar(100) | 對應的查詢Elasticsearch索引 |
- SQL語句
create database if not exists elasticsearch CHARACTER SET utf8 COLLATE utf8_general_ci;
create user elastic identified by '123456';
grant all privileges on elasticsearch.* to elastic;
create table es_sla
(
id bigint(20) unsigned primary key auto_increment,
from_time datetime,
from_timestamp int unsigned,
to_time datetime,
to_timestamp int unsigned,
status_code smallint unsigned,
count smallint unsigned,
es_index varchar(100)
);
4.4 Python定時任務
這裡設計的查詢的是每五分鐘就執行查詢Elasticsearch資料操作,python的定時任務使用框架APScheduler
。查閱資料,需要先安裝模組,然後定義定時執行的函式,設定執行的觸發器。
- 安裝模組:
pip install apscheduler
- 設定每五分鐘執行的定時器程式碼:
from apscheduler.schedulers.blocking import BlockingScheduler
# BlockingScheduler
scheduler = BlockingScheduler() #例項化定時器,如果要後臺執行,可以使用BackgroundScheduler
scheduler.add_job(es_job.query_es_job, 'interval', seconds=5 * 60)
scheduler.start()
4.5 Python連線資料庫及持久化
這裡連線的MySQL資料庫使用MySQL官方提供的聯結器Connector,我把它封裝成函式獲取一個數據庫連線物件。
- 安裝模組
pip install mysql-connector mysql-connector-python
- 獲取連線物件
import mysql.connector
def get_connection(db_config):
connection = mysql.connector.connect(user=db_config['username'],
password=db_config['password'],
host=db_config['host'],
database=db_config['database'])
return connection
- 插入資料表(持久化)
def insert_list(self, es_sla_list):
cursor = self.connection.cursor()
insert_sql = ("insert into es_sla (from_time,from_timestamp,to_time,"
"to_timestamp,status_code,count,es_index)"
"values (%(from_time)s,%(from_timestamp)s,"
"%(to_time)s,%(to_timestamp)s,%(status_code)s,%(count)s,%(es_index)s)")
row = 0
for es_sla in es_sla_list:
print(es_sla['status_code'])
cursor.execute(insert_sql, es_sla)
row = row + 1
if row == len(es_sla_list):
self.connection.commit()
else:
row = 0
cursor.close()
return row
4.6 定時任務執行內容
在前面的4.1-4.5中,我介紹了一些基礎,最終還是要定時任務執行的操作完成具體的Elasticsearch查詢操作和資料持久化操作。定時任務的詳細具體執行過程如下:
- 具體實現程式碼:
# function which query elasticsearch job
def query_es_job(self):
# set up query time ranger
str_from_time = (datetime.utcnow() + timedelta(seconds=-300)).strftime('%Y-%m-%dT%H:%M:%S')
str_to_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')
self.elasticsearch_config["query_object"]["query"]["bool"]["filter"]["range"]["@timestamp"]['gte'] \
= str_from_time
self.elasticsearch_config["query_object"]["query"]["bool"]["filter"]["range"]["@timestamp"]['lt'] = str_to_time
# query ElasticSearch
es = Elasticsearch([{'host': self.elasticsearch_config["host"], 'port': self.elasticsearch_config["port"]}],
http_auth=(self.elasticsearch_config['username'], self.elasticsearch_config['password']))
response = es.search(index=self.elasticsearch_config['index'], body=self.elasticsearch_config["query_object"])
# get our needed buckets
buckets = response['aggregations']['group_by_status']['buckets']
# set up time format for database attributes
from_time = (datetime.strptime(str_from_time, '%Y-%m-%dT%H:%M:%S') + timedelta(hours=8))\
.strftime('%Y-%m-%d %H:%M:%S')
to_time = (datetime.strptime(str_to_time, '%Y-%m-%dT%H:%M:%S') + timedelta(hours=8))\
.strftime('%Y-%m-%d %H:%M:%S')
if len(buckets) > 0 and buckets is not None:
# construct record object list
es_sla_list = []
for bucket in buckets:
es_sla = {
'from_time': from_time,
'from_timestamp': int(time.mktime(time.strptime(from_time, '%Y-%m-%d %H:%M:%S'))),
'to_time': to_time,
'to_timestamp': int(time.mktime(time.strptime(to_time, '%Y-%m-%d %H:%M:%S'))),
'status_code': int(bucket['key']),
'count': bucket['doc_count'],
'es_index': self.elasticsearch_config['index']
}
es_sla_list.append(es_sla)
# create ES_SLA_DAO object and insert
es_sla_dao = ES_SLA_DAO(self.database_config)
row = es_sla_dao.insert_list(es_sla_list)
if row == len(es_sla_list):
print_message('execute sql success.')
else:
print_message('execute sql fail.')
else:
print_message('Buckets length is 0. Noting to do.')
4.7 Grafana展示SA資料
Grafana展示系統可用性SA是通過配置MySQL資料庫源的,然後在Dashboard面板上新增Stat圖。在我這裡是直接編輯SQL語句查詢出來,使用MySQL記錄es_sla表,以to_time
(結束時間)作為Grafana的時間戳,依據公式SA = 1- (響應碼大於等於500的數量 / 請求總數)
進行查詢。
- SQL語句
SELECT
UNIX_TIMESTAMP(to_time) as time,
1 - sum(case when status_code >= 500 then count else 0 end) / sum(count) AS "SLA"
FROM es_sla
WHERE
UNIX_TIMESTAMP(to_time) BETWEEN 1603249298 AND 1603335698
GROUP BY 1
ORDER BY time
- SQL新增Grafana巨集函式
SELECT
UNIX_TIMESTAMP(to_time) as time,
1 - sum(case when status_code >= 500 then count else 0 end) / sum(count) AS "SLA"
FROM es_sla
WHERE
UNIX_TIMESTAMP(to_time) BETWEEN $__unixEpochFrom() AND $__unixEpochTo()
GROUP BY 1
ORDER BY time
根據Grafana的參考資料,$__unixEpochFrom()
表示當前選擇的開始時間(UNIX_TIMESTAMP),$__unixEpochTo()
表示當前選擇的結束時間(UNIX_TIMESTAMP)。還需要調整顯示的stat
圖形的單位為0.0~1.0
的百分數,保留4
位小數,如下圖所示:
5. 資料
所有的程式碼我上傳到GitHub倉庫:es-sla . 專案檔案結構如下圖所示:
- 說明:
目錄/檔案 | 說明 |
---|---|
config | 配置目錄,放置了ES的配置檔案elasticsearch.conf、MySQL資料庫配置檔案db.conf、建表語句檔案db.sql和待查詢的ES索引儲存檔案index.conf |
src | 原始碼目錄,包括資料庫的操作dao,定時任務job和一些工具包 |
main.py | 主執行程式,主要是載入配置檔案,建立定時任務執行 |
.gitignore | git忽略檔案的配置檔案 |
requirement.txt | Python程式所需的模組包,可以執行pip install -r requirement.txt安裝 |