aws-lambda - 如何从 lambda 触发器将消息返回给 SQS
问题描述
我有从 SQS 队列读取消息的 lambda 触发器。在某些情况下,消息可能尚未准备好进行处理,因此我想将消息放回队列中等待 1 分钟,然后重试。目前,我正在创建此客户记录的另一个副本并将此新副本发布到队列中。我是否有理由/方式将原始记录保留在队列中,而不是创建新记录
def postToQueue(customer):
if 'attemptCount' in customer.keys():
attemptCount = int(customer["attemptCount"]) + 1
else:
attemptCount = 2
customer["attemptCount"] = attemptCount
# Get the service resource
sqs = boto3.resource('sqs')
# Get the queue
queue = sqs.get_queue_by_name(QueueName='testCustomerQueue')
response = queue.send_message(MessageBody=json.dumps(customer), DelaySeconds=60)
print('customer postback: ', customer)
print ('response from writing ot the queue is: ', response)
#main function
for record in event['Records']:
if 'body' in record.keys():
customer = json.loads(record['body'])
print("attempting to process customer", customer, " at: ", datetime.datetime.now())
if (not ifReadyToProcess(customer)):
postToQueue(customer)
else:
processCustomer(customer)
解决方案
这不是 SQS 触发 Lambda 函数的理想设置。
我的测试表明,发送到 SQS 的消息会立即触发 Lambda 函数,即使提供了延迟设置。因此,将消息放回 SQS 队列将导致 Lambda 立即再次触发。
为了避免 Lambda 不断检查消息是否准备好处理的情况,我建议:
- 使用 Amazon CloudWatch Events 按计划(例如每 2 分钟)触发 Lambda 函数
- Lambda 函数应从队列中提取消息并检查它们是否已准备好进行处理。
- 如果它们准备好了,则处理它们并删除它们
- 如果它们还没有准备好,则使用延迟设置将它们推回队列并删除原始消息
请注意,这与让 SQS 直接触发 Lambda 不同。相反,Lambda 函数应该调用ReceiveMessages()
以获取消息本身,这允许延迟函数在检查之间添加一些时间。
另一种选择:您可以通过不删除消息来简单地利用默认可见性超时设置,而不是将消息重新插入队列。从队列中读取但未删除的消息将自动“重新出现”在队列中。您可以将其用作“重试”时间段。但是,这意味着您将需要自己处理死信处理(例如,如果一条消息在n次尝试后处理失败)。
推荐阅读
- python - 在数千个文件夹中查找图像相似性
- vue.js - 注意整个数组以及它的属性之一
- android - 如果我需要为按钮设置一点边框半径,为什么我们需要创建形状?
- vhdl - 为什么不完整的 if 语句在 VHDL 的综合过程中会创建锁存器?
- electron - How to send search text to findInPage in Electron
- sql - 蛋糕食谱的 SQL 模式和成分列表
- c# - C# Visual Studio 认为我的课程提前结束
- python - 从 _DMDA_Vec_array 到 Vec 的 petsc4py 映射以在 TS 中使用
- webhooks - Microsoft Teams 传出 webhook 坏了?
- sql - 使用两个子查询优化 OrientDB 中的 SQL 查询