- Query AWS CloudWatch
- Query DynamoDB
- Scan DynamoDB
- List S3 objects and read contents
- Read S3 file contents
最近在工作中需要对AWS上的部分资源进行查询和交叉分析,虽然场景都比较简单,但是这种半机械的工作当然还是交给Python来搞比较合适。AWS为Python提供的SDK库叫做boto3,所以我们建立一个Python项目,Interpreter选择的是venv解析,再将boto3安装到项目中,下面就可以开始愉快地写代码了。这个过程中有一些坑,记录在这里,以便后续查阅。
Query AWS CloudWatch
根据一定的搜索条件去CloudWatch中查找相关的log记录。
import boto3
def query_cloudwatch_with_condition(log_group, query, start_time, end_time):
"""
Search CloudWatch logs by some conditions.
:param log_group: eg. '/aws/some_log_group'
:param query: eg. f"fields @timestamp, @message \
| sort @timestamp desc \
| filter @message like /(?i)(some_filter)/ \
| filter @message like /Reason:\sError:/ \
| limit 10 \
| display @message"
:param start_time: eg. int((datetime.today() - timedelta(days=5)).timestamp())
:param end_time: eg. int(datetime.now().timestamp())
:return: log message string.
"""
cw_client = boto3.client('logs')
start_query_response = cw_client.start_query(
logGroupName=log_group,
startTime=start_time,
endTime=end_time,
queryString=query,
)
query_id = start_query_response['queryId']
response = None
# NOTE: Must wait for query to complete.
while response is None or response['status'] == 'Running':
print('Waiting for query to complete ...')
time.sleep(1)
response = cw_client.get_query_results(queryId=query_id)
issue_detail = ''
# NOTE: In my situation, we only care about the first message because we expect all logs are the same.
for item in response['results'][0]:
if item['field'] == '@message':
issue_detail = item['value']
break
return issue_detail
Query DynamoDB
import boto3
from boto3.dynamodb.conditions import Key
def query_dynamodb_with_condition(key_conditionn_exp):
"""
Query dynamodb with certain condition_exp (Query not Scan)
:param key_conditionn_exp: eg. Key('id').eq(certain_id) & Key('sk').begins_with('example::')
:return: query results list
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('some-dynamodb-name')
response = table.query(KeyConditionExpression=key_conditionn_exp)
items = response['Items']
# filter item if we have further conditions.
for item in items:
pass
return items
Scan DynamoDB
对DynamoDB做scan的时候,有个坑是AWS的DynamoDB单次scan是有上限的,所以为了做到full scan,需要在代码里面有一些处理
def scan_dynamodb_with_condition(filter_condition_exp):
"""
Full scan dynamodb with certain condition_exp
:param filter_condition_exp: eg. Attr('sk').eq('my_sk') & Attr('name').begins_with('Jone') & Attr('isDeleted').eq(False)
:return: scan results list
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('some-dynamo-table')
response = table.scan(FilterExpression=filter_condition_exp)
# Loop to do full scan
results = response['Items']
index = 1
while 'LastEvaluatedKey' in response:
print(f'scanning....{index}')
index += 1
response = table.scan(
ExclusiveStartKey=response['LastEvaluatedKey'],
FilterExpression=filter_condition_exp)
results.extend(response['Items'])
print(len(results))
return results
List S3 objects and read contents
读取S3某个路径下的所有objects也有一个坑,就是默认单次get object的上限是1000个,所以如果想做到full list,也需要做特定的处理。
def get_all_s3_objects(s3, **base_kwargs):
"""
Private method to list all files under path
:param s3: s3 client using boto3.client('s3')
:param base_kwargs: scan args
:return: yield file path to caller
"""
continuation_token = None
while True:
list_kwargs = dict(MaxKeys=1000, **base_kwargs)
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = s3.list_objects_v2(**list_kwargs)
yield from response.get('Contents', [])
if not response.get('IsTruncated'): # At the end of the list?
break
continuation_token = response.get('NextContinuationToken')
def main():
bucket_name = 'my-bucket-name'
s3_client = boto3.client('s3')
# using prefix to define search folder
prefix = 'this-is-some-path-without-prefix-and-postfix-slash'
file_paths = []
for file in get_all_s3_objects(s3_client, Bucket=bucket_name, Prefix=prefix):
file_paths.append(file['Key'])
print(f'length of file_paths: {len(file_paths)}')
with open('./file_paths_results.json', 'w') as f:
f.write(json.dumps(file_paths))
print('finished writing file paths into json file')
Read S3 file contents
在读取S3文件的内容时,我们遇到了文件Body里的内容(来自AWS SQS的message)无法正确的转换为json的问题,因为时间问题,没有太深入地研究,只是简单地做了一些非json语法字串的替换,把内容拿出来了,后面可以再研究一下这种文件内容需要怎么正确加载到json里。
import json
import re
from pprint import pprint
import boto3
from dynamodb_json import json_util
def read_file_contents(s3client, bucket, path):
"""
Read a file content with it's key (filepath)
:param s3client: eg. boto3.client('s3')
:param bucket: eg. 'some-bucket-name'
:param path: eg. 'some-path-to-my-file-with-postfix-no-slash-prefix'
:return: file contents in json format
"""
file_obj = s3client.get_object(
Bucket=bucket,
Key=path)
# open the file object and read it into the variable filedata.
file_data = file_obj['Body'].read()
# TODO: we did some ugly string replace here.. will fix this later
print_str = json_util.loads(file_data).replace('\\', '').replace('""', '"').replace('"Body":"', '"Body":').replace(
'}}}"}', '}}}}').replace('= "', '- ').replace('" Or', ' -').replace('" And', ' -')
json_obj = json_util.loads(print_str)
# NOTE: we use regex to match what we want.
# match = re.findall('someKey":{"S":"(.*?)"', print_str)
# if match:
# pprint(f'find key: {match[0]}')
# return match[0]
# else:
# print(f'no key found!')
# return None
return json_obj
本文作为此次生产环境数据问题Investigate的解决过程,记录在这里,数据已经经过脱敏,请结合自己的实际环境进行配置。