首页 > 解决方案 > Python boto3 异常处理

问题描述

我想创建一个具有多个属性的 S3 存储桶:

def create_s3_bucket(name):
    s3Client = boto3.client('s3')

    try:
        s3Client.create_bucket(
            Bucket=name,
            ACL="private",
            CreateBucketConfiguration={
                'LocationConstraint': 'eu-central-1',
            },
        )
    except ClientError as e:
        print(e)

    try:
        s3Client.put_public_access_block(
            Bucket=name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
            }
        )
    except ClientError as e:
        print(e)

    try:

        s3Client.put_bucket_versioning(
            Bucket=name,
            VersioningConfiguration={
                'Status': 'Enabled'
            }
        )
    except ClientError as e:

        s3Client.put_bucket_tagging(
            Bucket=name,
            Tagging={
                'TagSet': [
                    {
                        'Key': 'firstTag',
                        'Value': 'firstValue'
                    },
                ]
            }
        )
    except ClientError as e:
        print(e)

到目前为止,这有效。我的问题是:有没有更好、更优雅的方式来处理 ClientError,因为我一直在代码中重复自己?如果我将每个 API 调用放入一个 try catch 块中,那么可能会出现一个调用没有发生事件的情况。

标签: pythonpython-3.xamazon-web-servicesexceptionboto3

解决方案


您可以将所有这些放在同一个 try 块中,这最终会阻止它尝试无法执行的功能(即从未创建过存储桶)。

如果引发异常,它应该返回除打印之外的其他内容以识别此失败,以防止函数的调用者继续执行逻辑。

此外,在您的异常声明中,如果您的存储桶已创建但无法继续,您可以回滚。

此代码可能如下所示

def create_s3_bucket(name):
    s3Client = boto3.client('s3')

    try:
        s3Client.create_bucket(
            Bucket=name,
            ACL="private",
            CreateBucketConfiguration={
                'LocationConstraint': 'eu-central-1',
            },
        )

        s3Client.put_public_access_block(
            Bucket=name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
            }
        )

        s3Client.put_bucket_versioning(
            Bucket=name,
            VersioningConfiguration={
                'Status': 'Enabled'
            }
        )

        s3Client.put_bucket_tagging(
            Bucket=name,
            Tagging={
                'TagSet': [
                    {
                        'Key': 'firstTag',
                        'Value': 'firstValue'
                    },
                ]
            }
        )
    except ClientError as e:
        print(e)
        #Do some tidy up (delete bucket or notify)
        return false

    return true


推荐阅读