python - Lambda 实现事务
问题描述
我们创建了一个 lambda,它按计划将消息从 DL SQS 队列移动到 SQS 队列(目标)。作为其中的一部分,我想实现交易。
基本上将消息复制到目标队列,然后删除 DL 队列(源)中的消息。但是在任何情况下,将消息复制到目标队列后,并且无法删除源队列中的消息,都应该从目标队列中删除该消息。
这是我的源代码
import json
import boto3
import sys
import sys
def get_messages_from_queue(sqs_client, queue_url, max_message_count):
"""Generates messages from an SQS queue.
Note: this continues to generate messages until the queue is empty.
Every message on the queue will be deleted.
:param queue_url: URL of the SQS queue to read.
See https://alexwlchan.net/2018/01/downloading-sqs-queues/
"""
processed_message_count = 0
while processed_message_count < max_message_count:
#print("Max Mesage Count: " + str(max_message_count))
remaining_message_count = max_message_count - processed_message_count
#print("Remaining messages: " + str(remaining_message_count))
receive_message_count = min(10, remaining_message_count)
get_resp = sqs_client.receive_message(
QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=receive_message_count
)
#print("Actual response:")
#print(get_resp)
try:
#print("Number of messages receieved: " + str(len(get_resp["Messages"])))
yield from get_resp["Messages"]
except KeyError:
return
entries = [
{"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
for msg in get_resp["Messages"]
]
resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
if len(resp["Successful"]) != len(entries):
raise RuntimeError(
f"Failed to delete messages: entries={entries!r} resp={resp!r}"
)
processed_message_count += len(get_resp["Messages"])
print("After deleting, number of processed messages are: " + str(processed_message_count))
def lambda_handler(event, context):
max_message_count = event['MSG_TRANSFER_LIMIT']
src_queue_url = event["SRC_QUEUE_URL"]
dst_queue_url = event["DEST_QUEUE_URL"]
if src_queue_url == dst_queue_url:
sys.exit("Source and destination queues cannot be the same.")
sqs_client = boto3.client("sqs")
#while processed_message_count < max_message_count:
for message in get_messages_from_queue(sqs_client, src_queue_url, max_message_count):
response = sqs_client.send_message(QueueUrl=dst_queue_url, MessageBody=message["Body"])
print(json.loads(message["Body"]['records'][0]))
#print(response)
return {
'ProcessedMessageCount': max_message_count
}
解决方案
无法从 Amazon SQS 队列中检索特定消息。代码将调用receive_messages()
并获取队列中的任何内容。无法选择或过滤将返回哪些消息。
坦率地说,如果您担心源消息不会删除,那么我建议您实施重试代码以再次尝试删除。无法删除很可能是由于暂时的网络错误(重试应该修复)或消息已被删除的事实。
推荐阅读
- python - Clean up unlabeled pixels in image segmentation
- mysql - mysql - Applying an outer join to a complex statement
- angular - Angular:如何管理 *ngFor 中的单个复选框
- javascript - 如何使用 AngularJS 根据下拉选择值禁用输入文本框?
- mysql - 如何为 MySQL 查询中的所有记录生成唯一的 UUID?
- php - laravel Can't write image data to path
- matlab - reading the data of a file into matlab, editing it, and saving it elsewhere
- html - 根据从另一个字段中选择动态填充字段
- java - 从用户输入获取双倍值时出错
- android - 在 Android 应用程序中访问声音资产时如何区分文件和内容 URI