python - Dagster 在另一个实体中使用实体的 AssetMaterialization(包括可重现的代码)
问题描述
所以假设我有两个固体。第一个进行一些计算并将文件写入磁盘。第二个实体获取该文件并用它做其他事情,但它需要它的文件系统路径才能打开它。我可以使用两个yield
s (一个用于 the AssetMaterialization
,另一个用于 the str
Output
)并明确地将 theOutput
放入第二个可靠调用中:
from dagster import (AssetKey, AssetMaterialization, EventMetadataEntry,
Output, execute_pipeline, pipeline, solid)
@solid
def yield_asset(context):
yield AssetMaterialization(
asset_key=AssetKey('my_dataset'),
description='Persisted result to storage',
metadata_entries=[
EventMetadataEntry.text('Text-based metadata for this event',
label='text_metadata'),
EventMetadataEntry.fspath('/path/to/data/on/filesystem'),
EventMetadataEntry.url('http://mycoolsite.com/url_for_my_data',
label='dashboard_url'),
],
)
yield Output('/path/to/data/on/filesystem')
@solid
def print_asset_path(context, asset_path: str):
# do stuff with `asset_path`
context.log.info(asset_path)
@pipeline
def some_pipeline():
asset_path = yield_asset()
print_asset_path(asset_path)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
这很好用,您应该在日志 ( 2021-03-16 13:23:29 - dagster - INFO - system - 366248ec-6a83-462f-b62f-9fb2514f6f80 - print_asset_path - /path/to/data/on/filesystem
) 和AssetMaterialization
in中获得信息消息dagit
。
但是,这有点不方便,因为我需要使用我需要Output
的文件系统路径显式生成一个。是否有可能以及如何AssetMaterialization
在第二个实体中引用 并直接使用其属性?
类似的东西(不起作用):
@solid
def print_asset_path(context):
asset_path = context.assets.get_asset_by_key(`my_key`).fspath
# do stuff with `asset_path`
context.log.info(asset_path)
解决方案
您提供的代码是目前在 Dagster 中完成此任务的最佳方式。
如果在实体本身执行之前知道 fspath,那么这两个问题中概述的方向(尚未实现)可能会提供更优雅的解决方案:
推荐阅读
- android - LocationAwareException:任务“:app:processDebugManifest”执行失败
- java - 为对象创建定义的值
- go - 如果查询限制很大,goroutines 中的 GORM 会冻结
- java - Java - 扫描仪读取没有赋值的字符串(已解决)
- r - 表频率和直方图之间的差异... geom_histogram 向下取整?
- html - 数组的每个值的垫输入
- sql - 尝试对包含文本和整数的列运行计算时出错:转换 varchar 值时转换失败
- aws-lambda - 触发 Lambda 的 SQS 从不会在 Records 中发送超过一条消息
- javascript - 未捕获的类型错误:无法在 XMLHttpRequest.xhttp.onreadystatechange 处设置属性“innerHTML”为空
- android - ExoPlayer 不播放视频