1. 程式人生 > 其它 >Boto3訪問AWS資源操作總結(1)

Boto3訪問AWS資源操作總結(1)

最近在工作中需要對AWS上的部分資源進行查詢和交叉分析,雖然場景都比較簡單,但是這種半機械的工作當然還是交給Python來搞比較合適。AWS為Python提供的SDK庫叫做boto3,所以我們建立一個Python專案,Interpreter選擇的是venv解析,再將boto3安裝到專案中,下面就可以開始愉快地寫程式碼了。這個過程中有一些坑,記錄在這裡,以便後續查閱。

Query AWS CloudWatch

根據一定的搜尋條件去CloudWatch中查詢相關的log記錄。

import boto3

def query_cloudwatch_with_condition(log_group, query, start_time, end_time):
    """
    Search CloudWatch logs by some conditions.
    :param log_group: eg. '/aws/some_log_group'
    :param query: eg. f"fields @timestamp, @message \
                            | sort @timestamp desc \
                            | filter @message like /(?i)(some_filter)/ \
                            | filter @message like /Reason:\sError:/ \
                            | limit 10 \
                            | display @message"
    :param start_time: eg. int((datetime.today() - timedelta(days=5)).timestamp())
    :param end_time: eg. int(datetime.now().timestamp())
    :return: log message string.
    """
    cw_client = boto3.client('logs')
    
    start_query_response = cw_client.start_query(
        logGroupName=log_group,
        startTime=start_time,
        endTime=end_time,
        queryString=query,
    )

    query_id = start_query_response['queryId']
    response = None

    # NOTE: Must wait for query to complete.
    while response is None or response['status'] == 'Running':
        print('Waiting for query to complete ...')
        time.sleep(1)
        response = cw_client.get_query_results(queryId=query_id)

    issue_detail = ''
    # NOTE: In my situation, we only care about the first message because we expect all logs are the same.
    for item in response['results'][0]:
        if item['field'] == '@message':
            issue_detail = item['value']
            break

    return issue_detail

Query DynamoDB

import boto3
from boto3.dynamodb.conditions import Key

def query_dynamodb_with_condition(key_conditionn_exp):
    """
    Query dynamodb with certain condition_exp (Query not Scan)
    :param key_conditionn_exp: eg. Key('id').eq(certain_id) & Key('sk').begins_with('example::')
    :return: query results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamodb-name')

    response = table.query(KeyConditionExpression=key_conditionn_exp)
    items = response['Items']

    # filter item if we have further conditions.
    for item in items:
        pass

    return items

Scan DynamoDB

對DynamoDB做scan的時候,有個坑是AWS的DynamoDB單次scan是有上限的,所以為了做到full scan,需要在程式碼裡面有一些處理

def scan_dynamodb_with_condition(filter_condition_exp):
    """
    Full scan dynamodb with certain condition_exp
    :param filter_condition_exp: eg. Attr('sk').eq('my_sk') & Attr('name').begins_with('Jone') & Attr('isDeleted').eq(False)
    :return: scan results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamo-table')

    response = table.scan(FilterExpression=filter_condition_exp)

    # Loop to do full scan
    results = response['Items']
    index = 1
    while 'LastEvaluatedKey' in response:
        print(f'scanning....{index}')
        index += 1
        response = table.scan(
            ExclusiveStartKey=response['LastEvaluatedKey'],
            FilterExpression=filter_condition_exp)

        results.extend(response['Items'])
        print(len(results))

    return results

List S3 objects and read contents

讀取S3某個路徑下的所有objects也有一個坑,就是預設單次get object的上限是1000個,所以如果想做到full list,也需要做特定的處理。

def get_all_s3_objects(s3, **base_kwargs):
    """
    Private method to list all files under path
    :param s3: s3 client using boto3.client('s3')
    :param base_kwargs: scan args
    :return: yield file path to caller
    """
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token

        response = s3.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])

        if not response.get('IsTruncated'):  # At the end of the list?
            break

        continuation_token = response.get('NextContinuationToken')


def main():
    bucket_name = 'my-bucket-name'
    s3_client = boto3.client('s3')
    # using prefix to define search folder
    prefix = 'this-is-some-path-without-prefix-and-postfix-slash'

    file_paths = []
    for file in get_all_s3_objects(s3_client, Bucket=bucket_name, Prefix=prefix):
        file_paths.append(file['Key'])

    print(f'length of file_paths: {len(file_paths)}')
    with open('./file_paths_results.json', 'w') as f:
        f.write(json.dumps(file_paths))
        print('finished writing file paths into json file')

Read S3 file contents

在讀取S3檔案的內容時,我們遇到了檔案Body裡的內容(來自AWS SQS的message)無法正確的轉換為json的問題,因為時間問題,沒有太深入地研究,只是簡單地做了一些非json語法字串的替換,把內容拿出來了,後面可以再研究一下這種檔案內容需要怎麼正確載入到json裡。

import json
import re
from pprint import pprint

import boto3
from dynamodb_json import json_util

def read_file_contents(s3client, bucket, path):
    """
    Read a file content with it's key (filepath)
    :param s3client: eg. boto3.client('s3')
    :param bucket: eg. 'some-bucket-name'
    :param path: eg. 'some-path-to-my-file-with-postfix-no-slash-prefix'
    :return: file contents in json format
    """
    file_obj = s3client.get_object(
        Bucket=bucket,
        Key=path)
    
    # open the file object and read it into the variable filedata.
    file_data = file_obj['Body'].read()

    # TODO: we did some ugly string replace here.. will fix this later
    print_str = json_util.loads(file_data).replace('\\', '').replace('""', '"').replace('"Body":"', '"Body":').replace(
        '}}}"}', '}}}}').replace('= "', '- ').replace('" Or', ' -').replace('" And', ' -')
    
    json_obj = json_util.loads(print_str)

    # NOTE: we use regex to match what we want.
    # match = re.findall('someKey":{"S":"(.*?)"', print_str)
    # if match:
    #     pprint(f'find key: {match[0]}')
    #     return match[0]
    # else:
    #     print(f'no key found!')
    #     return None

    return json_obj

本文作為此次生產環境資料問題Investigate的解決過程,記錄在這裡,資料已經經過脫敏,請結合自己的實際環境進行配置。