1. 程式人生 > 其它 >谷歌雲k8s event webhook訊息推送

谷歌雲k8s event webhook訊息推送

(1)谷歌雲monitoring服務裡面建立自定義查詢,傳送到日誌接收器,過濾gke日誌如下:

(2)日誌接收器選一個類似kafka的元件(pub/sub),然後針對那個topic建立訊息訂閱,再其建立日誌觸發器(python指令碼),產生一條日誌,就會實時傳送到kakfa,然後被訂閱者消費,同時觸發指令碼處理日誌,篩選重要的推送釘釘。

這就是日誌服務傳送到kafka的日誌,然後用指令碼處理這條日誌就可以,看谷歌雲日誌觸發器語法,自己寫指令碼處理日誌:

## ==================================================
##    讓讀書成為一種生活方式。就像吃喝拉撒每天必須要乾的事,
## 終有一天你的舉止、言談、氣質會不一樣。 
##                                        —
- 5sdba ## ## Created Date: Saturday, 2021-07-08, 11:55:05 am ## copyright (c): SZWW Tech. LTD. ## Engineer: async ## Module Name: ## Revision: v0.01 ## Description: ## ## Revision History : ## Revision editor date Description ## v0.01 async 2021-07-08 File Created ## ================================================== import requests import json,hmac,base64,hashlib import urllib.parse import datetime,time import pytz tz
= pytz.timezone('Asia/Shanghai') URL = "https://oapi.dingtalk.com/robot/send?access_token=cxxxxxx" secret = "xxxxxx" def get_timestamp_sign(): timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc
= string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return (timestamp, sign) def get_signed_url(): timestamp, sign = get_timestamp_sign() webhook = URL + "&timestamp="+timestamp+"&sign="+sign return webhook def get_webhook(mode): if mode == 0: # only 敏感字 webhook = URL elif mode == 1 or mode ==2 : # 敏感字和加簽 或 # 敏感字+加簽+ip webhook = get_signed_url() else: webhook = "" print("error! mode: ",mode," webhook : ",webhook) return webhook def send_msg(event,context): webhook = get_webhook(1) str_log = base64.b64decode(event['data']).decode('utf-8') k8s_log=json.loads(str_log) print(k8s_log) ops_time = datetime.datetime.strptime(k8s_log["receiveTimestamp"].replace("'", '').replace('T', ' ').split('.')[0],"%Y-%m-%d %H:%M:%S") + datetime.timedelta(hours=8) ops_cluster = k8s_log['resource']['labels']['cluster_name'] # ops_pod = k8s_log['jsonPayload']['metadata']['namespace'] ops_pod=k8s_log['jsonPayload']['metadata']['name'].split('.')[0] ops_namespace=k8s_log['jsonPayload']['metadata']['namespace'] ops_type = k8s_log['jsonPayload']['metadata']['managedFields'][0]['operation'] ops_info=k8s_log['severity'] if 'WARNING' in ops_info: log_pri="<font color=#D3F545 size=3>warning</font>" elif 'INFO' in ops_info: log_pri="<font color=#008000 size=3> info </font>" else: log_pri="<font color=#FF4109 size=3> sos </font>" ops_log = k8s_log['jsonPayload']['message'] if ops_type not in ['Pulled', 'Scheduled']: pagrem = { "msgtype": "markdown", "markdown": { "title": "K8s ops" + "....", "text": "<font color=#FF0000 size=3>操作詳情:</font>" + "\n\n>日誌時間 :" + str(ops_time) + "\n\n>叢集名稱:" + str(ops_cluster) + "\n\n>pod組名:" + str(ops_pod) + "\n\n>名稱空間:" + str(ops_namespace) + "\n\n>日誌級別:" + str(log_pri) + "\n\n>日誌詳情:" + str(ops_log) }, "at": { "atMobiles": [ "xxxxx" ] }, "isAtAll": "False" } headers = { 'Content-Type': 'application/json' } requests.post(url=webhook, data=json.dumps(pagrem), headers=headers) print(json.dumps(pagrem))

(3)最後完成的效果如下:

可以再優化一下指令碼,做一些詳細的判斷,達到自己需要的目標。

業餘經濟愛好者