首页 > 解决方案 > aws athena 创建标记的 s3 对象

问题描述

我想运行一个 Athena 查询,它将结果写入 s3,并且所有结果的标签都与运行查询的工作组的标签相似。

即结果

client.start_query_execution(
    QueryString='some query',
    ResultConfiguration={
        'OutputLocation': 's3://path/to/query/bucket/'
    },
    WorkGroup='my_wg'
)

s3://path/to/query/bucket/将使用 workgroup 的标签写入my_wg。有可能吗,如果可以的话怎么办?

提前致谢

标签: amazon-web-servicesamazon-s3amazon-athena

解决方案


Athena 编写的对象上的 S3 事件不提供任何可能对通过 Lambda 函数进行标记有用的信息。我们现在可以确定一个解决方案。

无需编写任何代码的另一种方法是为与不同工作组关联的输出位置设置不同的路径前缀。

如果要继续手动标记,QueryExecutionId则在结果中返回。

您可以检索写入查询结果的对象位置并对其进行标记。

import time
import boto3
from urllib.parse import urlparse


workgroup = "my_wg"
workgroup_tag = {"Key": "workgroup", "Value": workgroup}

athena_client = boto3.client("athena", region_name="us-east-1")
response = athena_client.start_query_execution(
    QueryString="some query",
    ResultConfiguration={
        "OutputLocation": "s3://path/to/query/bucket/"
    },
    WorkGroup=workgroup
)
query_execution_id = response['QueryExecutionId']

state = None
output_location = None

while True:
    response = athena_client.get_query_execution(
        QueryExecutionId=query_execution_id
    )
    query_execution = response["QueryExecution"]
    state = query_execution["Status"]["State"]
    if state in ["QUEUED", "RUNNING"]:
        time.sleep(5)
    else:
        if state == "SUCCEEDED":
            output_location = query_execution["ResultConfiguration"]["OutputLocation"]
        break

# Now proceed to tagging
s3_client = boto3.client("s3", region_name="us-east-1")
if output_location:
    parse_result = urlparse(output_location)
    bucket = parse_result.netloc
    key = parse_result.path.lstrip('/')
    
    # Add tag to existing tags on the object.
    response = s3.get_object_tagging(Bucket=bucket, Key=key)
    tag_set = response["TagSet"]
    tag_set.append(workgroup_tag)
    tagging = {"TagSet": tag_set}

    response = s3.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging)

由于对标记查询结果有这么多的麻烦。我建议为与不同工作组关联的输出位置配置不同的路径前缀。这样您就不必手动标记它们。


推荐阅读