函式計算自動化運維實戰1 -- 定時任務
阿新 • • 發佈:2019-12-03
函式計算
阿里雲函式計算是一個事件驅動的全託管計算服務。通過函式計算,您無需管理伺服器等基礎設施,只需編寫程式碼並上傳。函式計算會為您準備好計算資源,以彈性、可靠的方式執行您的程式碼,並提供日誌查詢,效能監控,報警等功能。藉助於函式計算,您可以快速構建任何型別的應用和服務,無需管理和運維。更棒的是,您只需要為程式碼實際執行消耗的資源付費,而程式碼未執行則不產生費用。
函式計算中的TimeTrigger
觸發器是觸發函式執行的方式。有時候您不想手動呼叫函式執行,您希望當某件事情發生時自動觸發函式的執行,這個事情就是事件源。您可以通過配置觸發器的方式設定事件源觸發函式執行。
例如,設定定時觸發器,可以在某個時間點觸發函式執行或者每隔5分鐘觸發函式一次;函式計算timetrigger
專題傳送門 => 函式計算進行自動化運維專題
定時任務自動化場景分析
定時任務示例場景1
某些賬號ak需要定期更換,以確保ak安全;
在下面的程式碼示例中,授權service具有訪問kms許可權的能力,使用kms,先對一個具有建立和刪除ak許可權的ak加密密文
解密,獲取具有建立和刪除ak許可權的AK
, 之後利用這個AK
進行ak的建立和刪除操作
說明: 除了使用kms加密解密來獲取較大許可權的
AK
, 通過函式計算環境變數的設定也是一種很好的方法
操作步驟
建立函式,函式建立可參考函式計算helloworld
注:記得給函式的service的role設定訪問kms許可權
給函式配置定時器,詳情可參考定時觸發函式
- 函式程式碼(函式計算已經內建了相關sdk,直接使用下面的程式碼即可)
# -*- coding: utf-8 -*- import logging, time, json from aliyunsdkcore import client from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest from aliyunsdkcore.auth.credentials import StsTokenCredential # ak Encrypt content AK_CiphertextBlob = "NmQyY2ZhODMtMTlhYS00MTNjLTlmZjAtZTQxYTFiYWVmMzZmM1B1NXhTZENCNXVWd1dhdTNMWVRvb3V6dU9QcVVlMXRBQUFBQUFBQUFBQ3gwZTkzeGhDdHVzMWhDUCtZeVVuMWlobzlCa3VxMlErOXFHWWdXXXHELLwL1NSZTFvUURYSW9lak5Hak1lMnF0R2I1TWUxMEJiYmkzVnBwZHlrWGYzc3kyK2tQbGlKb2lHQ3lrZUdieHN2eXZwSVYzN2Qyd1cydz09" USER_NAME = "ls-test" # sub-account name LOGGER = logging.getLogger() def handler(event, context): creds = context.credentials sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) # this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id clt = client.AcsClient(region_id=context.region, credential=sts_token_credential) request = DecryptRequest() request.set_CiphertextBlob(AK_CiphertextBlob) response = _send_request(clt, request) ak_info = json.loads(response.get("Plaintext","{}")) if not ak_info: return "KMS Decrypt ERROR" ak_id = ak_info["ak_id"] ak_secret = ak_info["ak_secret"] LOGGER.info("Decrypt sucessfully with key id: {}".format(response.get("KeyId","{}"))) clt2 = client.AcsClient(ak_id, ak_secret, context.region) request = CreateAccessKeyRequest() request.set_UserName(USER_NAME) # 給子賬號ls-test建立AK response = _send_request(clt2, request) create_ak_id = response.get("AccessKey",{}).get("AccessKeyId") if not create_ak_id: return LOGGER.info("create ak {} sucess!".format(create_ak_id)) time.sleep(10) request = DeleteAccessKeyRequest() request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id) response = _send_request(clt2, request) LOGGER.info("delete ak {} sucess!".format(create_ak_id)) return "OK" # send open api request def _send_request(clt, request): request.set_accept_format('json') try: response_str = clt.do_action_with_exception(request) LOGGER.debug(response_str) response_detail = json.loads(response_str) return response_detail except Exception as e: LOGGER.error(e)
AK 存在環境變數版本
# -*- coding: utf-8 -*-
import os, logging, time, json
from aliyunsdkcore import client
from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest
from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest
USER_NAME = "ls-test" # sub-account name
LOGGER = logging.getLogger()
def handler(event, context):
ak_id = os.environ['AK_ID']
ak_secret = os.environ['AK_SECRET']
clt = client.AcsClient(ak_id, ak_secret, context.region)
request = CreateAccessKeyRequest()
request.set_UserName(USER_NAME) # 給子賬號USER_NAME建立AK
response = _send_request(clt, request)
create_ak_id = response.get("AccessKey", "").get("AccessKeyId")
if not create_ak_id:
return
LOGGER.info("create ak {} sucess!".format(create_ak_id))
time.sleep(5)
request = DeleteAccessKeyRequest()
request.set_UserName(USER_NAME)
request.set_UserAccessKeyId(create_ak_id)
response = _send_request(clt, request)
LOGGER.info("delete ak {} sucess!".format(create_ak_id))
return "OK"
# send open api request
def _send_request(clt, request):
request.set_accept_format('json')
try:
response_str = clt.do_action_with_exception(request)
LOGGER.info(response_str)
response_detail = json.loads(response_str)
return response_detail
except Exception as e:
LOGGER.error(e)
定時任務示例場景2
定期檢查自己ecs對應暴露的埠,確保安全,比如你的ecs是一個網站伺服器,可能只需要對外暴露80埠就行,如果出現0.0.0.0/0這種允許所有人訪問的,需要出現報警或者自動修復
操作步驟
建立函式,函式建立可參考函式計算helloworld
注:記得給函式的service的role設定管理ecs許可權
- 給函式配置定時器,詳情可參考定時觸發函式
# -*- coding: utf-8 -*-
import logging
import json, random, string, time
from aliyunsdkcore import client
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeSecurityGroupAttributeRequest import DescribeSecurityGroupAttributeRequest
from aliyunsdkcore.auth.credentials import StsTokenCredential
LOGGER = logging.getLogger()
clt = None
# 需要檢查的ecs列表, 修改成你的ecs id 列表
ECS_INST_IDS = ["i-uf6h07zdscdg9g55zkxx", "i-uf6bwkxfxh847a1e2xxx"]
def handler(event, context):
creds = context.credentials
global clt
sts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
# this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_id
clt = client.AcsClient(region_id=context.region, credential=sts_token_credential)
invalid_perssions = {}
for ecs_id in ECS_INST_IDS:
ret = check_and_modify_security_rule(ecs_id)
if ret:
invalid_perssions[ecs_id] = ret
return invalid_perssions
def check_and_modify_security_rule(instance_id):
LOGGER.info("check_and_modify_security_rule, instance_id is %s ", instance_id)
request = DescribeInstancesRequest()
request.set_InstanceIds(json.dumps([instance_id]))
response = _send_request(request)
SecurityGroupIds = []
if response is not None:
instance_list = response.get('Instances', {}).get('Instance')
for item in instance_list:
SecurityGroupIds = item.get('SecurityGroupIds', {}).get("SecurityGroupId", [])
break
if not SecurityGroupIds:
LOGGER.error("ecs {} do not have SecurityGroupIds".format(instance_id))
return
invalid_perssions = []
for sg_id in SecurityGroupIds:
request = DescribeSecurityGroupAttributeRequest()
request.set_SecurityGroupId(sg_id)
response = _send_request(request)
LOGGER.info("Find a securityGroup id {}".format(sg_id))
permissions = response.get("Permissions", {}).get("Permission",[])
if not permissions:
continue
for permission in permissions:
if permission["Direction"] == "ingress" and permission["SourceCidrIp"] == "0.0.0.0/0":
LOGGER.error("ecs {0} , SecurityGroup id {1}, have a risk, need fix; permission = {2}".format(instance_id, sg_id, permission))
invalid_perssions.append(permission)
return invalid_perssions
# send open api request
def _send_request(request):
request.set_accept_format('json')
try:
response_str = clt.do_action_with_exception(request)
LOGGER.debug(response_str)
response_detail = json.loads(response_str)
return response_detail
except Exception as e:
LOGGER.error(e)