首页 > 解决方案 > 如何将一个 cdk 类的输出动态绑定到另一个 cdk 类

问题描述

所以我在 CDK 中创建了 2 个不同的 python 类。一类是LambdaAutomationTaskStack创建少量 lambda 函数并使用 lambda 调用将它们链接到步骤函数任务。所以简而言之,LambdaAutomationTaskStack创建一个列表类型变量lambda_attached_task_names,其中包含与 lambda 关联的任务列表。

class LambdaAutomationTaskStack(cdk.Stack):

    def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
        self.lambda_attached_task_names = list()
        self.lambda_attached_task_names.append(self.prepare_check_lambda_function_a())
        self.lambda_attached_task_names.append(self.prepare_check_lambda_function_b())

    def prepare_check_lambda_function_a(self):
        check_lambda_function_a = _lambda.Function(self, 'lambda_function_a',
                                                    code=_lambda.Code.from_asset('lambdas/lambda_function_a'),
                                                    handler='lambda_function_a.handler',
                                                    function_name=f'lambda_function_a',
                                                    runtime=_lambda.Runtime.PYTHON_3_8,
                                                    timeout=cdk.Duration.seconds(30),
                                                    )
        check_lambda_function_a_as_task = sfn_tasks.LambdaInvoke(self, 'check_lambda_function_a_as_task',
                                                                   lambda_function=check_lambda_function_a,
                                                                   result_path='$.check_function_a')
        return check_lambda_function_a_as_task

    def prepare_check_lambda_function_b(self):
        check_lambda_function_b = _lambda.Function(self, 'lambda_function_b',
                                                   code=_lambda.Code.from_asset('lambdas/lambda_function_b'),
                                                   handler='lambda_function_b.handler',
                                                   function_name=f'lambda_function_b',
                                                   runtime=_lambda.Runtime.PYTHON_3_8,
                                                   timeout=cdk.Duration.seconds(30),
                                                   )
        check_lambda_function_b_as_task = sfn_tasks.LambdaInvoke(self, 'check_lambda_function_b_as_task',
                                                                 lambda_function=check_lambda_function_b,
                                                                 result_path='$.check_function_b')
        return check_lambda_function_b_as_task

另一个类StepFunctionAutomationStack接受这个list of tasks变量lambda_attached_task_names 作为 step-function cdk 堆栈的输入的一部分。

现在在这堂课中,我如何创建definition 如下:

假设我lambda_attached_task_names有 2 个值,它应该创建如下定义:

definition = sfn.Chain.start(check_lambda_function_a_as_task)\
            .next(check_lambda_function_b_as_task) 

假设我的lambda_attached_task_namesthen 定义中有 5 个任务应该如下所示:

因此,我进行了更改以创建如下定义:

definition = "sfn.Chain.start"
        for index, value in enumerate(task_details):
            if index == 0:
                definition += "(" + str(value) + ")"
            elif index < len(task_details) - 1:
                definition += ".next(" + str(value) + ")"
            else:
                definition += ".next(" + str(value) + ")"

现在,如果我将它传递给我的状态机:

sfn.StateMachine(
            self, "StateMachine",
            definition=definition,
            timeout=cdk.Duration.seconds(30),
            state_machine_name='anl-some-stack'
        )

现在,这让我

jsii.errors.JSIIError: Expected object reference, got "sfn.Chain.start(<aws_cdk.aws_stepfunctions_tasks.LambdaInvoke object at 0x7f0f666da650>).next(<aws_cdk.aws_stepfunctions_tasks.LambdaInvoke object at 0x7f0f672daad0>)"

那么,我怎样才能将 class 中关联的实际任务与 class 关联LambdaAutomationTaskStackStepFunctionAutomationStack而不是作为对象?

这两个类在主python中被调用和引用app.py

更新:我尝试在我的主 app.py 中使用这个 agian 检查值

for index, value in enumerate(lambda_automation_stack.lambda_attached_task_names):
    print(value.__dict__)

这给了我这样的:

{'__jsii_ref__': CreateResponse(ref='@aws-cdk/aws-stepfunctions-tasks.LambdaInvoke@10013', interfaces=None)}
{'__jsii_ref__': CreateResponse(ref='@aws-cdk/aws-stepfunctions-tasks.LambdaInvoke@10018', interfaces=None)}

但是,如果我将这两个类合并到一个类中,它们都可以正常工作。

标签: python-3.xaws-cdk

解决方案


我通常用 Typescript 而不是 Python 编写 CDK 代码,所以请耐心等待。
您似乎正在为定义创建一个字符串,但 StateMachineProps.Definition 属性需要一个 IChainable 对象。
因此,您的代码应该构建一个工作流而不是一个字符串,例如(请原谅我,这是未经测试的):

definition = sfn.Chain.start(check_lambda_function_a_as_task)
current_step = definition
for index, value in enumerate(task_details):#assuming task_details is an IChainable object
    current_step.next(value)
    current_step = value

#and leave this code the same
sfn.StateMachine(
  self, "StateMachine",
  definition=definition,
  timeout=cdk.Duration.seconds(30),
  state_machine_name='anl-some-stack'
)


推荐阅读