Boto3訪問AWS資源操作總結(1)
阿新 • • 發佈:2021-12-15
最近在工作中需要對AWS上的部分資源進行查詢和交叉分析,雖然場景都比較簡單,但是這種半機械的工作當然還是交給Python來搞比較合適。AWS為Python提供的SDK庫叫做boto3,所以我們建立一個Python專案,Interpreter選擇的是venv解析,再將boto3安裝到專案中,下面就可以開始愉快地寫程式碼了。這個過程中有一些坑,記錄在這裡,以便後續查閱。
Query AWS CloudWatch
根據一定的搜尋條件去CloudWatch中查詢相關的log記錄。
import boto3 def query_cloudwatch_with_condition(log_group, query, start_time, end_time): """ Search CloudWatch logs by some conditions. :param log_group: eg. '/aws/some_log_group' :param query: eg. f"fields @timestamp, @message \ | sort @timestamp desc \ | filter @message like /(?i)(some_filter)/ \ | filter @message like /Reason:\sError:/ \ | limit 10 \ | display @message" :param start_time: eg. int((datetime.today() - timedelta(days=5)).timestamp()) :param end_time: eg. int(datetime.now().timestamp()) :return: log message string. """ cw_client = boto3.client('logs') start_query_response = cw_client.start_query( logGroupName=log_group, startTime=start_time, endTime=end_time, queryString=query, ) query_id = start_query_response['queryId'] response = None # NOTE: Must wait for query to complete. while response is None or response['status'] == 'Running': print('Waiting for query to complete ...') time.sleep(1) response = cw_client.get_query_results(queryId=query_id) issue_detail = '' # NOTE: In my situation, we only care about the first message because we expect all logs are the same. for item in response['results'][0]: if item['field'] == '@message': issue_detail = item['value'] break return issue_detail
Query DynamoDB
import boto3 from boto3.dynamodb.conditions import Key def query_dynamodb_with_condition(key_conditionn_exp): """ Query dynamodb with certain condition_exp (Query not Scan) :param key_conditionn_exp: eg. Key('id').eq(certain_id) & Key('sk').begins_with('example::') :return: query results list """ dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('some-dynamodb-name') response = table.query(KeyConditionExpression=key_conditionn_exp) items = response['Items'] # filter item if we have further conditions. for item in items: pass return items
Scan DynamoDB
對DynamoDB做scan的時候,有個坑是AWS的DynamoDB單次scan是有上限的,所以為了做到full scan,需要在程式碼裡面有一些處理
def scan_dynamodb_with_condition(filter_condition_exp): """ Full scan dynamodb with certain condition_exp :param filter_condition_exp: eg. Attr('sk').eq('my_sk') & Attr('name').begins_with('Jone') & Attr('isDeleted').eq(False) :return: scan results list """ dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('some-dynamo-table') response = table.scan(FilterExpression=filter_condition_exp) # Loop to do full scan results = response['Items'] index = 1 while 'LastEvaluatedKey' in response: print(f'scanning....{index}') index += 1 response = table.scan( ExclusiveStartKey=response['LastEvaluatedKey'], FilterExpression=filter_condition_exp) results.extend(response['Items']) print(len(results)) return results
List S3 objects and read contents
讀取S3某個路徑下的所有objects也有一個坑,就是預設單次get object的上限是1000個,所以如果想做到full list,也需要做特定的處理。
def get_all_s3_objects(s3, **base_kwargs):
"""
Private method to list all files under path
:param s3: s3 client using boto3.client('s3')
:param base_kwargs: scan args
:return: yield file path to caller
"""
continuation_token = None
while True:
list_kwargs = dict(MaxKeys=1000, **base_kwargs)
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = s3.list_objects_v2(**list_kwargs)
yield from response.get('Contents', [])
if not response.get('IsTruncated'): # At the end of the list?
break
continuation_token = response.get('NextContinuationToken')
def main():
bucket_name = 'my-bucket-name'
s3_client = boto3.client('s3')
# using prefix to define search folder
prefix = 'this-is-some-path-without-prefix-and-postfix-slash'
file_paths = []
for file in get_all_s3_objects(s3_client, Bucket=bucket_name, Prefix=prefix):
file_paths.append(file['Key'])
print(f'length of file_paths: {len(file_paths)}')
with open('./file_paths_results.json', 'w') as f:
f.write(json.dumps(file_paths))
print('finished writing file paths into json file')
Read S3 file contents
在讀取S3檔案的內容時,我們遇到了檔案Body裡的內容(來自AWS SQS的message)無法正確的轉換為json的問題,因為時間問題,沒有太深入地研究,只是簡單地做了一些非json語法字串的替換,把內容拿出來了,後面可以再研究一下這種檔案內容需要怎麼正確載入到json裡。
import json
import re
from pprint import pprint
import boto3
from dynamodb_json import json_util
def read_file_contents(s3client, bucket, path):
"""
Read a file content with it's key (filepath)
:param s3client: eg. boto3.client('s3')
:param bucket: eg. 'some-bucket-name'
:param path: eg. 'some-path-to-my-file-with-postfix-no-slash-prefix'
:return: file contents in json format
"""
file_obj = s3client.get_object(
Bucket=bucket,
Key=path)
# open the file object and read it into the variable filedata.
file_data = file_obj['Body'].read()
# TODO: we did some ugly string replace here.. will fix this later
print_str = json_util.loads(file_data).replace('\\', '').replace('""', '"').replace('"Body":"', '"Body":').replace(
'}}}"}', '}}}}').replace('= "', '- ').replace('" Or', ' -').replace('" And', ' -')
json_obj = json_util.loads(print_str)
# NOTE: we use regex to match what we want.
# match = re.findall('someKey":{"S":"(.*?)"', print_str)
# if match:
# pprint(f'find key: {match[0]}')
# return match[0]
# else:
# print(f'no key found!')
# return None
return json_obj
本文作為此次生產環境資料問題Investigate的解決過程,記錄在這裡,資料已經經過脫敏,請結合自己的實際環境進行配置。