首页 > 解决方案 > 如何为 AWS Athena 服务编写 Pytest

问题描述

我正在为使用 AThena 的 lambda 编写 pytest,使用以下代码创建 pytest.fixture:

@pytest.fixture()
def moto_boto_athena():
    @mock_athena
    def boto_resource_athena():
        res = boto3.client('athena')
        return res
    return boto_resource_athena

我的 Lambda 有以下代码:

response = get_athena_client().start_query_execution(
        QueryString=query_string,
        QueryExecutionContext={
            'Database': athena_database
        },
        ResultConfiguration={
            'OutputLocation': athena_output_bucket,
        }
    )

    query_status = None
    #Check the status of QueryExecution
    while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
        query_status = get_athena_client(). \
            get_query_execution(QueryExecutionId=response['QueryExecutionId'])['QueryExecution']['Status']['State']
        if query_status == 'FAILED' or query_status == 'CANCELLED':
            raise Exception

但是,一旦我开始执行测试,就会出现以下异常:

raise NotImplementedError(
>           "The {0} action has not been implemented".format(action)
        )
E       NotImplementedError: The start_query_execution action has not been implemented

..\..\..\..\..\AppData\Roaming\Python\Python36\site-packages\moto\core\responses.py:393: NotImplementedError

请指导。

标签: python-3.xamazon-web-servicesaws-lambdapytestamazon-athena

解决方案


推荐阅读