python - Flyte 0.16.2:加载 Blob 时出错 - 如何让 Types.Blob.fetch() 在任务修饰函数中工作?
问题描述
我有这样的 Flyte 任务功能:
@task
def do_stuff(framework_obj):
framework_obj.get_outputs() # This calls Types.Blob.fetch(some_uri)
尝试使用 加载 blob URI flytekit.sdk.types.Types.Blob.fetch
,但收到此错误:
ERROR:flytekit: Exception when executing No temporary file system is present. Either call this method from within the context of a task or surround with a 'with LocalTestFileSystem():' block. Or specify a path when calling this function. Note: Cleanup is not automatic when a path is specified.
我可以确认我可以with LocalTestFileSystem()
在测试中使用 , 加载 blob,但是当实际尝试运行工作流时,我不确定为什么会出现此错误,因为调用 blob-processing 的函数被装饰了,@task
所以它绝对是 Flyte任务。我还确认了任务节点存在于 Flyte Web 控制台上。
错误引用的路径是什么以及如何正确调用此函数?
使用 Flyte 版本 0.16.2
解决方案
您能否提供有关代码的更多信息?这是flytekit 0.15.x版?我有点困惑,因为那个版本不应该有@task
装饰器。它应该只具有@python_task
较旧的 API。如果你想使用新的 python 原生类型 API,你应该安装 flytekit==0.17.0。
另外,您能否指出您正在查看的文档?我们最近对文档进行了相当多的更新,可能对此有些困惑。 这些都是值得一看的例子。还有两个新的 Python 类,FlyteFile和FlyteDirectory,它们取代了 flytekit 中的 Blob 类(尽管这仍然是 IDL类型的名称)。
(会留下这个作为评论,但我还没有声誉。)
一些有助于获取输出和从文件输出中读取的代码
@task
def task_file_reader():
client = SynchronousFlyteClient("flyteadmin.flyte.svc.cluster.local:81", insecure=True)
exec_id = WorkflowExecutionIdentifier(
domain="development",
project="flytesnacks",
name="iaok0qy6k1",
)
data = client.get_execution_data(exec_id)
lit = data.full_outputs.literals["o0"]
ctx = FlyteContext.current_context()
ff = TypeEngine.to_python_value(ctx, lv=lit,
expected_python_type=FlyteFile)
with open(ff, 'rb') as fh:
print(fh.readlines())
推荐阅读
- javascript - 什么正则表达式从Javascript中字符串的开头和结尾修剪任意标点符号?
- azure - 如何在没有 DVD 的情况下安装 Team Foundation Server 2010?
- powershell - 尝试使用 Powershell 消除我们使用的软件的不需要的安装。(GUID)
- apache-spark - spark repartition / executor inconsistencies commandline vs jupyter
- python - 没有重叠单词的句子的 Fuzzywuzzy 分数高于那些有一些重叠的句子?
- r - 使用条件将列更改为单独的数据框
- json - 未在 Wikidata 或 DBpedia 上的 Wikipedia 表数据的 JSON,下一步?
- php - MySQL全文搜索修改,任意顺序搜索词
- c# - 将 XML 转换为 JSON 到 c# 对象
- ruby-on-rails - 为 Rails 创建简单的站点密码 - 没有用户名