首页 > 解决方案 > Lambda 实现事务

问题描述

我们创建了一个 lambda,它按计划将消息从 DL SQS 队列移动到 SQS 队列(目标)。作为其中的一部分,我想实现交易。

基本上将消息复制到目标队列,然后删除 DL 队列(源)中的消息。但是在任何情况下,将消息复制到目标队列后,并且无法删除源队列中的消息,都应该从目标队列中删除该消息。

这是我的源代码

import json
import boto3
import sys
import sys

def get_messages_from_queue(sqs_client, queue_url, max_message_count):
    """Generates messages from an SQS queue.
    Note: this continues to generate messages until the queue is empty.
    Every message on the queue will be deleted.
    :param queue_url: URL of the SQS queue to read.
    See https://alexwlchan.net/2018/01/downloading-sqs-queues/
    """
    processed_message_count = 0

    while processed_message_count < max_message_count:
        #print("Max Mesage Count: " + str(max_message_count))
        remaining_message_count = max_message_count - processed_message_count
        #print("Remaining messages: " + str(remaining_message_count))

        receive_message_count = min(10, remaining_message_count)
        get_resp = sqs_client.receive_message(
            QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=receive_message_count
        )

        #print("Actual response:")
        #print(get_resp)

        try:
            #print("Number of messages receieved: " + str(len(get_resp["Messages"])))
            yield from get_resp["Messages"]
        except KeyError:
            return

        entries = [
            {"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
            for msg in get_resp["Messages"]
        ]

        resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)

        if len(resp["Successful"]) != len(entries):
            raise RuntimeError(
                f"Failed to delete messages: entries={entries!r} resp={resp!r}"
            )
        
        processed_message_count += len(get_resp["Messages"])
        print("After deleting, number of processed messages are: " + str(processed_message_count))

def lambda_handler(event, context):
        max_message_count = event['MSG_TRANSFER_LIMIT']
        src_queue_url = event["SRC_QUEUE_URL"]
        dst_queue_url = event["DEST_QUEUE_URL"]

        if src_queue_url == dst_queue_url:
            sys.exit("Source and destination queues cannot be the same.")

        sqs_client = boto3.client("sqs")

        #while processed_message_count < max_message_count:
            

        for message in get_messages_from_queue(sqs_client, src_queue_url, max_message_count):
            response = sqs_client.send_message(QueueUrl=dst_queue_url, MessageBody=message["Body"])
            print(json.loads(message["Body"]['records'][0]))
            #print(response)

        return {
            'ProcessedMessageCount': max_message_count
        }

标签: pythonamazon-web-servicesaws-lambda

解决方案


无法从 Amazon SQS 队列中检索特定消息。代码将调用receive_messages()并获取队列中的任何内容。无法选择或过滤将返回哪些消息。

坦率地说,如果您担心源消息不会删除,那么我建议您实施重试代码以再次尝试删除。无法删除很可能是由于暂时的网络错误(重试应该修复)或消息已被删除的事实。


推荐阅读