首页 > 解决方案 > Flyte 0.16.2:加载 Blob 时出错 - 如何让 Types.Blob.fetch() 在任务修饰函数中工作?

问题描述

我有这样的 Flyte 任务功能:

@task
def do_stuff(framework_obj):
    framework_obj.get_outputs()  # This calls Types.Blob.fetch(some_uri)

尝试使用 加载 blob URI flytekit.sdk.types.Types.Blob.fetch,但收到此错误:

ERROR:flytekit: Exception when executing No temporary file system is present.  Either call this method from within the context of a task or surround with a 'with LocalTestFileSystem():' block.  Or specify a path when calling this function.  Note: Cleanup is not automatic when a path is specified.

我可以确认我可以with LocalTestFileSystem()在测试中使用 , 加载 blob,但是当实际尝试运行工作流时,我不确定为什么会出现此错误,因为调用 blob-processing 的函数被装饰了,@task所以它绝对是 Flyte任务。我还确认了任务节点存在于 Flyte Web 控制台上。

错误引用的路径是什么以及如何正确调用此函数?

使用 Flyte 版本 0.16.2

标签: pythonflyte

解决方案


您能否提供有关代码的更多信息?这是flytekit 0.15.x版?我有点困惑,因为那个版本不应该有@task装饰器。它应该只具有@python_task较旧的 API。如果你想使用新的 python 原生类型 API,你应该安装 flytekit==0.17.0。

另外,您能否指出您正在查看的文档?我们最近对文档进行了相当多的更新,可能对此有些困惑。 这些都是值得一看的例子。还有两个新的 Python 类,FlyteFileFlyteDirectory,它们取代了 flytekit 中的 Blob 类(尽管这仍然是 IDL类型的名称)。

(会留下这个作为评论,但我还没有声誉。)

一些有助于获取输出和从文件输出中读取的代码

@task
def task_file_reader():
    client = SynchronousFlyteClient("flyteadmin.flyte.svc.cluster.local:81", insecure=True)
    exec_id = WorkflowExecutionIdentifier(
    domain="development",
    project="flytesnacks",
    name="iaok0qy6k1",
    )
    data = client.get_execution_data(exec_id)
    lit = data.full_outputs.literals["o0"]

    ctx = FlyteContext.current_context()
    ff = TypeEngine.to_python_value(ctx, lv=lit, 
    expected_python_type=FlyteFile)
    with open(ff, 'rb') as fh:
        print(fh.readlines())

推荐阅读