首页 > 解决方案 > SQS 任务在 DLQ 中进行,尽管在 Lambda 中成功 + 手动删除时也是如此

问题描述

我已经围绕 AWS Lambda 和 Salesforce 构建了自己的应用程序。我有大约 10 个用户在使用我的内部应用程序,所以不要谈论大量使用。

每天,我有大约 500-1000 个 SQS 任务可以在正常的一天处理,其中一项任务可能需要大约 1-60 秒,具体取决于其复杂性。

这是完美的工作。

我创建了一个装饰器,它允许我通过 SQS 处理我的一些函数,这些函数需要使用 FIFO 逻辑进行 ASYNC 处理。一切运作良好。

我的 Lambda 函数最后不会返回任何内容,但会成功完成(标准场景)。但是,我注意到一些任务正在进入我的 DLQ(我只允许处理一次,如果它被表示它会立即进入 DLQ)。

我不明白的是为什么会这样?

所以我添加了手动删除在函数结束时处理的任务。我已经记录了执行 boto3.client.delete_message 时发送的结果,并且我得到了 200 状态,所以一切正常.....但是偶尔(100 次中有 1 次,所以我每天 10 次)案例)我可以看到任务进入 DLQ... 将相同的任务重新处理到我的标准队列中而不更改任何内容...它被成功处理(再次)并被删除(如最初预期的那样)。

对我来说最有问题的是删除消息仍然会以有时进入 DLQ 的方式结束它?可能是什么问题呢 ?

我的异步处理器示例

def process_data(event, context):
    """
    By convention, we need to store in the table AsyncTaskQueueNamea dict with the following parameters:
    - python_module: use to determine the location of the method to call asynchronously
    - python_function: use to determine the location of the method to call asynchronously
    - uuid: uuid to get the params stored in dynamodb
    """

    print('Start Processing Async')

    client = boto3.client('sqs')
    queue_url = client.get_queue_url(QueueName=settings.AsyncTaskQueueName)['QueueUrl']

    # batch size = 1 so only record 1 to process
    for record in event['Records']:
        try:
            kwargs = json.loads(record['body'])

            print(f'Start Processing Async Data Record:\n{kwargs}')
            
            python_module = kwargs['python_module']
            python_function = kwargs['python_function']

            # CALLING THE FUNCTION WE WANTED ASYNC, AND DOING ITS STUFF... (WORKING OK)
            getattr(sys.modules[python_module], python_function)(uuid=kwargs['uuid'], is_in_async_processing=True)

            
            print('End Processing Async Data Record')
            res = client.delete_message(QueueUrl=queue_url, ReceiptHandle=record['receiptHandle'])
            print(f'End Deleting Async Data Record with status: {res}')  # When the problem I'm monitoring occurs, it goes up to this line, with res status = 200 !! That's where I'm losing my mind. I can confirm the uuid in the DLQ being the same as in the queue so we are definitely talking of the same message which has been moved to the DLQ.


        except Exception:
            # set expire to 0 so that the task goes into DLQ
            client.change_message_visibility(
                QueueUrl=queue_url,
                ReceiptHandle=record['receiptHandle'],
                VisibilityTimeout=0
            )


        utils.raise_exception(f'There was a problem during async processing. Event:\n'
                              f'{json.dumps(event, indent=4, default=utils.jsonize_datetime)}')

今天使用来自 CloudWatch 的日志的错误示例:初始事件:

{'Records': [{'messageId': '75587372-256a-47d4-905b-62e1b42e2dad', 'receiptHandle': 'YYYYYY", "python_module": "quote.processing", "python_function": "compute_price_data"}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1621432888344', 'SequenceNumber': '18861830893125615872', 'MessageGroupId': 'compute_price_data', 'SenderId': 'XXXXX:main-app-production-main', 'MessageDeduplicationId': 'b4de6096-b8aa-11eb-9d50-5330640b1ec1', 'ApproximateFirstReceiveTimestamp': '1621432888344'}, 'messageAttributes': {}, 'md5OfBody': '5a67d0ed88898b7b71643ebba975e708', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:eu-west-3:XXXXX:async_task-production.fifo', 'awsRegion': 'eu-west-3'}]}

Res(在调用 delete_message 之后):

End Deleting Async Data Record with status: {'ResponseMetadata': {'RequestId': '7738ffe7-0adb-5812-8701-a6f8161cf411', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7738ffe7-0adb-5812-8701-a6f8161cf411', 'date': 'Wed, 19 May 2021 14:02:47 GMT', 'content-type': 'text/xml', 'content-length': '215'}, 'RetryAttempts': 0}}

但是... 75587372-256a-47d4-905b-62e1b42e2dad 在此 delete_message 之后位于 DLQ 中。我变得疯狂

标签: aws-lambdaamazon-sqs

解决方案


好的,问题是由于我的 serverless.yml 超时设置为 900,但不是在 AWS 中。我可能已经手动将其更改为 1 分钟,所以我的长任务在 1 分钟后被释放,然后立即进入 DLQ。

因此删除会做任何事情,因为删除时任务已经在 DLQ 中


推荐阅读