最近在工作中需要对AWS上的部分资源进行查询和交叉分析,虽然场景都比较简单,但是这种半机械的工作当然还是交给Python来搞比较合适。AWS为Python提供的SDK库叫做boto3,所以我们建立一个Python项目,Interpreter选择的是venv解析,再将boto3安装到项目中,下面就可以开始愉快地写代码了。这个过程中有一些坑,记录在这里,以便后续查阅。

Query AWS CloudWatch

根据一定的搜索条件去CloudWatch中查找相关的log记录。

import boto3

def query_cloudwatch_with_condition(log_group, query, start_time, end_time):
    """
    Search CloudWatch logs by some conditions.
    :param log_group: eg. '/aws/some_log_group'
    :param query: eg. f"fields @timestamp, @message \
                            | sort @timestamp desc \
                            | filter @message like /(?i)(some_filter)/ \
                            | filter @message like /Reason:\sError:/ \
                            | limit 10 \
                            | display @message"
    :param start_time: eg. int((datetime.today() - timedelta(days=5)).timestamp())
    :param end_time: eg. int(datetime.now().timestamp())
    :return: log message string.
    """
    cw_client = boto3.client('logs')
    
    start_query_response = cw_client.start_query(
        logGroupName=log_group,
        startTime=start_time,
        endTime=end_time,
        queryString=query,
    )

    query_id = start_query_response['queryId']
    response = None

    # NOTE: Must wait for query to complete.
    while response is None or response['status'] == 'Running':
        print('Waiting for query to complete ...')
        time.sleep(1)
        response = cw_client.get_query_results(queryId=query_id)

    issue_detail = ''
    # NOTE: In my situation, we only care about the first message because we expect all logs are the same.
    for item in response['results'][0]:
        if item['field'] == '@message':
            issue_detail = item['value']
            break

    return issue_detail

Query DynamoDB

import boto3
from boto3.dynamodb.conditions import Key

def query_dynamodb_with_condition(key_conditionn_exp):
    """
    Query dynamodb with certain condition_exp (Query not Scan)
    :param key_conditionn_exp: eg. Key('id').eq(certain_id) & Key('sk').begins_with('example::')
    :return: query results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamodb-name')

    response = table.query(KeyConditionExpression=key_conditionn_exp)
    items = response['Items']

    # filter item if we have further conditions.
    for item in items:
        pass

    return items

Scan DynamoDB

对DynamoDB做scan的时候,有个坑是AWS的DynamoDB单次scan是有上限的,所以为了做到full scan,需要在代码里面有一些处理

def scan_dynamodb_with_condition(filter_condition_exp):
    """
    Full scan dynamodb with certain condition_exp
    :param filter_condition_exp: eg. Attr('sk').eq('my_sk') & Attr('name').begins_with('Jone') & Attr('isDeleted').eq(False)
    :return: scan results list
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('some-dynamo-table')

    response = table.scan(FilterExpression=filter_condition_exp)

    # Loop to do full scan
    results = response['Items']
    index = 1
    while 'LastEvaluatedKey' in response:
        print(f'scanning....{index}')
        index += 1
        response = table.scan(
            ExclusiveStartKey=response['LastEvaluatedKey'],
            FilterExpression=filter_condition_exp)

        results.extend(response['Items'])
        print(len(results))

    return results

List S3 objects and read contents

读取S3某个路径下的所有objects也有一个坑,就是默认单次get object的上限是1000个,所以如果想做到full list,也需要做特定的处理。

def get_all_s3_objects(s3, **base_kwargs):
    """
    Private method to list all files under path
    :param s3: s3 client using boto3.client('s3')
    :param base_kwargs: scan args
    :return: yield file path to caller
    """
    continuation_token = None
    while True:
        list_kwargs = dict(MaxKeys=1000, **base_kwargs)
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token

        response = s3.list_objects_v2(**list_kwargs)
        yield from response.get('Contents', [])

        if not response.get('IsTruncated'):  # At the end of the list?
            break

        continuation_token = response.get('NextContinuationToken')


def main():
    bucket_name = 'my-bucket-name'
    s3_client = boto3.client('s3')
    # using prefix to define search folder
    prefix = 'this-is-some-path-without-prefix-and-postfix-slash'

    file_paths = []
    for file in get_all_s3_objects(s3_client, Bucket=bucket_name, Prefix=prefix):
        file_paths.append(file['Key'])

    print(f'length of file_paths: {len(file_paths)}')
    with open('./file_paths_results.json', 'w') as f:
        f.write(json.dumps(file_paths))
        print('finished writing file paths into json file')

Read S3 file contents

在读取S3文件的内容时,我们遇到了文件Body里的内容(来自AWS SQS的message)无法正确的转换为json的问题,因为时间问题,没有太深入地研究,只是简单地做了一些非json语法字串的替换,把内容拿出来了,后面可以再研究一下这种文件内容需要怎么正确加载到json里。

import json
import re
from pprint import pprint

import boto3
from dynamodb_json import json_util

def read_file_contents(s3client, bucket, path):
    """
    Read a file content with it's key (filepath)
    :param s3client: eg. boto3.client('s3')
    :param bucket: eg. 'some-bucket-name'
    :param path: eg. 'some-path-to-my-file-with-postfix-no-slash-prefix'
    :return: file contents in json format
    """
    file_obj = s3client.get_object(
        Bucket=bucket,
        Key=path)
    
    # open the file object and read it into the variable filedata.
    file_data = file_obj['Body'].read()

    # TODO: we did some ugly string replace here.. will fix this later
    print_str = json_util.loads(file_data).replace('\\', '').replace('""', '"').replace('"Body":"', '"Body":').replace(
        '}}}"}', '}}}}').replace('= "', '- ').replace('" Or', ' -').replace('" And', ' -')
    
    json_obj = json_util.loads(print_str)

    # NOTE: we use regex to match what we want.
    # match = re.findall('someKey":{"S":"(.*?)"', print_str)
    # if match:
    #     pprint(f'find key: {match[0]}')
    #     return match[0]
    # else:
    #     print(f'no key found!')
    #     return None

    return json_obj

本文作为此次生产环境数据问题Investigate的解决过程,记录在这里,数据已经经过脱敏,请结合自己的实际环境进行配置。

Categories: Python