首页 > 解决方案 > 在 Kubeflow Pipelines 中,如何将元素列表发送到轻量级 python 组件?

问题描述

我正在尝试将元素列表作为 PipelineParameter 发送到轻量级组件。
这是重现该问题的示例。这是功能:

def my_func(my_list: list) -> bool:
    print(f'my_list is {my_list}')
    print(f'my_list is of type {type(my_list)}')
    print(f'elem 0 is {my_list[0]}')
    print(f'elem 1 is {my_list[1]}')
    return True

如果我用这个执行它:

test_data = ['abc', 'def']
my_func(test_data)

它的行为符合预期:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

但是如果我将它包装在一个操作中并设置一个管道:

import kfp

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
    my_op(my_list)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

然后运行管道:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'my_list': test_data})

然后似乎在某些时候我的列表被转换为字符串!

my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '

标签: kubeflowkubeflow-pipelines

解决方案


这是我发现的一种解决方法,将参数序列化为 json 字符串。不确定这真的是最好的方法......

裸函数变为:

def my_func(json_arg_str: str) -> bool:
    import json
    args = json.loads(json_arg_str)
    my_list = args['my_list']
    print(f'my_list is {my_list}')
    print(f'my_list is of type {type(my_list)}')
    print(f'elem 0 is {my_list[0]}')
    print(f'elem 1 is {my_list[1]}')
    return True

只要您将 args 作为 json 字符串而不是列表传递,它仍然有效:

test_data = '{"my_list":["abc", "def"]}' my_func(test_data)

这会产生预期的结果:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

现在管道更改为接受 astr而不是 aPipelineParam类型kfp.dsl.types.List

import kfp 

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
    my_op(json_arg_str)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

其中,当这样执行时:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'json_arg_str': test_data})

产生相同的结果:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

尽管它有效,但我仍然觉得这种解决方法很烦人。如果不允许作为 List 的 PipelineParam,那么 kfp.dsl.types.List的意义何在?


推荐阅读