python-3.x - 如何将一个 cdk 类的输出动态绑定到另一个 cdk 类
问题描述
所以我在 CDK 中创建了 2 个不同的 python 类。一类是LambdaAutomationTaskStack
创建少量 lambda 函数并使用 lambda 调用将它们链接到步骤函数任务。所以简而言之,LambdaAutomationTaskStack
创建一个列表类型变量lambda_attached_task_names
,其中包含与 lambda 关联的任务列表。
class LambdaAutomationTaskStack(cdk.Stack):
def __init__(self, scope: cdk.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.lambda_attached_task_names = list()
self.lambda_attached_task_names.append(self.prepare_check_lambda_function_a())
self.lambda_attached_task_names.append(self.prepare_check_lambda_function_b())
def prepare_check_lambda_function_a(self):
check_lambda_function_a = _lambda.Function(self, 'lambda_function_a',
code=_lambda.Code.from_asset('lambdas/lambda_function_a'),
handler='lambda_function_a.handler',
function_name=f'lambda_function_a',
runtime=_lambda.Runtime.PYTHON_3_8,
timeout=cdk.Duration.seconds(30),
)
check_lambda_function_a_as_task = sfn_tasks.LambdaInvoke(self, 'check_lambda_function_a_as_task',
lambda_function=check_lambda_function_a,
result_path='$.check_function_a')
return check_lambda_function_a_as_task
def prepare_check_lambda_function_b(self):
check_lambda_function_b = _lambda.Function(self, 'lambda_function_b',
code=_lambda.Code.from_asset('lambdas/lambda_function_b'),
handler='lambda_function_b.handler',
function_name=f'lambda_function_b',
runtime=_lambda.Runtime.PYTHON_3_8,
timeout=cdk.Duration.seconds(30),
)
check_lambda_function_b_as_task = sfn_tasks.LambdaInvoke(self, 'check_lambda_function_b_as_task',
lambda_function=check_lambda_function_b,
result_path='$.check_function_b')
return check_lambda_function_b_as_task
另一个类StepFunctionAutomationStack
接受这个list of tasks
变量lambda_attached_task_names
作为 step-function cdk 堆栈的输入的一部分。
现在在这堂课中,我如何创建definition
如下:
假设我lambda_attached_task_names
有 2 个值,它应该创建如下定义:
definition = sfn.Chain.start(check_lambda_function_a_as_task)\
.next(check_lambda_function_b_as_task)
假设我的lambda_attached_task_names
then 定义中有 5 个任务应该如下所示:
因此,我进行了更改以创建如下定义:
definition = "sfn.Chain.start"
for index, value in enumerate(task_details):
if index == 0:
definition += "(" + str(value) + ")"
elif index < len(task_details) - 1:
definition += ".next(" + str(value) + ")"
else:
definition += ".next(" + str(value) + ")"
现在,如果我将它传递给我的状态机:
sfn.StateMachine(
self, "StateMachine",
definition=definition,
timeout=cdk.Duration.seconds(30),
state_machine_name='anl-some-stack'
)
现在,这让我
jsii.errors.JSIIError: Expected object reference, got "sfn.Chain.start(<aws_cdk.aws_stepfunctions_tasks.LambdaInvoke object at 0x7f0f666da650>).next(<aws_cdk.aws_stepfunctions_tasks.LambdaInvoke object at 0x7f0f672daad0>)"
那么,我怎样才能将 class 中关联的实际任务与 class 关联LambdaAutomationTaskStack
,StepFunctionAutomationStack
而不是作为对象?
这两个类在主python中被调用和引用app.py
更新:我尝试在我的主 app.py 中使用这个 agian 检查值
for index, value in enumerate(lambda_automation_stack.lambda_attached_task_names):
print(value.__dict__)
这给了我这样的:
{'__jsii_ref__': CreateResponse(ref='@aws-cdk/aws-stepfunctions-tasks.LambdaInvoke@10013', interfaces=None)}
{'__jsii_ref__': CreateResponse(ref='@aws-cdk/aws-stepfunctions-tasks.LambdaInvoke@10018', interfaces=None)}
但是,如果我将这两个类合并到一个类中,它们都可以正常工作。
解决方案
我通常用 Typescript 而不是 Python 编写 CDK 代码,所以请耐心等待。
您似乎正在为定义创建一个字符串,但 StateMachineProps.Definition 属性需要一个 IChainable 对象。
因此,您的代码应该构建一个工作流而不是一个字符串,例如(请原谅我,这是未经测试的):
definition = sfn.Chain.start(check_lambda_function_a_as_task)
current_step = definition
for index, value in enumerate(task_details):#assuming task_details is an IChainable object
current_step.next(value)
current_step = value
#and leave this code the same
sfn.StateMachine(
self, "StateMachine",
definition=definition,
timeout=cdk.Duration.seconds(30),
state_machine_name='anl-some-stack'
)
推荐阅读
- javascript - 如何从 LocalStorage 编辑某个对象的属性,然后将其放回 Vanilla Javascript?
- javascript - 如何使用 React useState Hook 设置显示样式属性
- json.net - 有没有办法在反序列化(Newtonsoft)期间忽略“意外字符”错误?
- amazon-web-services - 如何使用代理协议版本 2 通过 AWS 网络负载均衡器获取客户端的真实 IP 地址?
- prolog - prolog alpha-beta 意外结果
- python - Numpy 矩阵看起来像一个列表数组?
- python - 图像切片器无法通过第一个切片 - 枕头
- r - 如何对齐大括号RStudio
- scala - 如何将火花数据帧转换为scala中的结构列表
- json - 使用 Flask 创建了一个 Web 服务。但是在 HTML 标签中获取输出