首页 > 解决方案 > Dagster 在另一个实体中使用实体的 AssetMaterialization(包括可重现的代码)

问题描述

所以假设我有两个固体。第一个进行一些计算并将文件写入磁盘。第二个实体获取该文件并用它做其他事情,但它需要它的文件系统路径才能打开它。我可以使用两个yields (一个用于 the AssetMaterialization,另一个用于 the str Output)并明确地将 theOutput放入第二个可靠调用中:

from dagster import (AssetKey, AssetMaterialization, EventMetadataEntry,
                     Output, execute_pipeline, pipeline, solid)

@solid
def yield_asset(context):
    yield AssetMaterialization(
        asset_key=AssetKey('my_dataset'),
        description='Persisted result to storage',
        metadata_entries=[
            EventMetadataEntry.text('Text-based metadata for this event',
                                    label='text_metadata'),
            EventMetadataEntry.fspath('/path/to/data/on/filesystem'),
            EventMetadataEntry.url('http://mycoolsite.com/url_for_my_data',
                                   label='dashboard_url'),
        ],
    )
    yield Output('/path/to/data/on/filesystem')


@solid
def print_asset_path(context, asset_path: str):
    # do stuff with `asset_path`
    context.log.info(asset_path)


@pipeline
def some_pipeline():
    asset_path = yield_asset()
    print_asset_path(asset_path)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

这很好用,您应该在日志 ( 2021-03-16 13:23:29 - dagster - INFO - system - 366248ec-6a83-462f-b62f-9fb2514f6f80 - print_asset_path - /path/to/data/on/filesystem) 和AssetMaterializationin中获得信息消息dagit

但是,这有点不方便,因为我需要使用我需要Output的文件系统路径显式生成一个。是否有可能以及如何AssetMaterialization在第二个实体中引用 并直接使用其属性?

类似的东西(不起作用):

@solid
def print_asset_path(context):
    asset_path = context.assets.get_asset_by_key(`my_key`).fspath
    # do stuff with `asset_path`
    context.log.info(asset_path)

标签: pythonpipelinedagster

解决方案


您提供的代码是目前在 Dagster 中完成此任务的最佳方式。

如果在实体本身执行之前知道 fspath,那么这两个问题中概述的方向(尚未实现)可能会提供更优雅的解决方案:


推荐阅读