首页 > 解决方案 > 如何使用 lambda 函数 (A) 的 n 个结果在 Python 中调用另一个 lambda 处理器函数 (B) 的 n 个并发实例?

问题描述

我有一个返回 n 个 url 的 AWS Lambda 函数 (A)。我想将这些 url 中的每一个单独并同时作为参数传递给另一个 AWS Lambda 函数 (B)。然后函数 B 处理传递的 url 并返回结果。这两个函数都是用 Python 编写的,如果可能的话,我宁愿避免使用其他语言。有没有人有明确的解决方案来解决超时、并发冲突、其他边缘情况和/或错误?

即使分配了最大内存,函数 A 仅需要约 85 秒来设置有效负载并调用函数 B 1,100 次。调用另一个 AWS Lambda 函数的典型时间约为 80 毫秒吗?有更快的方法吗?此外,函数 B 的 CloudWatch Logs 将多个日志流之间的调用分开,因此很难在一个地方查看所有调用,以确认事情是否正确完成和/或以何种顺序和/或任何错误/延迟可能位于何处.

我查看了boto3.client('lambda') 文档

我还利用Using boto 来调用 lambda 函数,我该如何异步执行此操作?AWS Lambda:使用 boto3 调用从另一个 AWS lambda 调用函数以获取我现有的代码。

这是我用于测试的代码。

# Function A - using max Memory setting (3008 MB currently) to speed things up

import boto3
import json

def lambda_handler(event, context):
    #simulate 1,100 urls (more than the default concurrency limit of 1,000)
    n = 1100
    results = range(1, n+1)
    #invoke function B asynchronously
    for result in results:
        payload = {'url' : result}
        boto3.client('lambda').invoke(FunctionName='B', InvocationType='Event', Payload=json.dumps(payload))
    return{'statusCode': 200, 'body': json.dumps('Hello from Lambda!')}
# Function B - using the min Memory setting (128 MB currently)

import json
import time

def lambda_handler(event, context):
    #wait 5 seconds to simulate processing time
    time.sleep(5)
    #process passed payload from function A
    print(event['url'])
    return{'statusCode': 200, 'body': json.dumps('Bye from Lambda!')}


标签: pythonconcurrencyaws-lambdainvoke

解决方案


调用另一个 AWS Lambda 函数的典型时间约为 80 毫秒吗?

这对我来说听起来不是很糟糕,但可能还有一些改进的余地。在查看您的代码时,让我印象深刻的一件事是您一遍又一遍地创建 AWS Lambda 客户端对象。尝试创建一次客户端,如下所示:

client = boto3.client('lambda')
for result in results:
        payload = {'url' : result}
        client.invoke(FunctionName='B', InvocationType='Event', Payload=json.dumps(payload))

通过重用相同的客户端对象,我认为您将看到由于重用与 AWS API 服务器的底层 HTTP 连接而带来的性能改进。

此外,函数 B 的 CloudWatch Logs 将多个日志流之间的调用分开,因此很难在一个地方查看所有调用,以确认事情是否正确完成和/或以何种顺序和/或任何错误/延迟可能位于何处.

您正在处理在多个服务器上运行的一千多个异步进程。在一个地方查看所有这些日志将是一项挑战。您可能会考虑使用CloudWatch Logs Insights 之类的东西。

有没有人有明确的解决方案来解决超时、并发冲突、其他边缘情况和/或错误?

管理超时、并发限制和其他错误的典型模式是将所有事件发送到 SQS 队列并让队列触发您的第二个 Lambda 函数。但是,虽然您的第一个 Lambda 函数将与现在一样快,或者可能更快,

可用于解决其中一些问题的另一种模式是在您的第一个 Lambda 函数中实现指数退避算法。但是,这将需要您的函数代码直接处理重试,而不是依靠 SQS 等其他 AWS 服务来为您处理重试,并且需要在您的 Lambda 函数中添加暂停,这可能会导致第一个函数调用最终在它之前超时已成功触发所有第二个函数调用,这只会创建另一个您必须以某种方式处理的错误情况。


推荐阅读