首页 > 解决方案 > 固体的核心计算多次返回输出

问题描述

我对 Dagster 很陌生,在文档中找不到我的问题的答案。

我有 2 个实体:一个产生从 XML 文件解析的元组(str,str),另一个只是使用元组并将对象存储在 DB 中并设置相应的字段。但是我遇到了一个错误Core compute for solid returned an output multiple times。我很确定我在设计中犯了根本性的错误。有人可以向我解释如何以正确的方式设计此管道,或者将我指向文档中解释此错误的章节吗?


@solid(output_defs=[OutputDefinition(Tuple, 'classification_data')])
def extract_classification_from_file(context, xml_path: String) -> Tuple:
    context.log.info(f"start")
    root = ET.parse(xml_path).getroot()
    for code_node in root.findall('definition-item'):
        context.log.info(f"{code_node.find('classification-symbol').text} {code_node.find('definition-title').text}")
        yield Output((code_node.find('classification-symbol').text, code_node.find('definition-title').text), 'classification_data')


@solid()
def load_classification(context, classification_data):
    cls = CPCClassification.objects.create(code=classification_data[0], description=classification_data[1]).save()

@pipeline
def define_classification_pipeline():
    load_classification(extract_classification_from_file())

标签: pythondjangoxmldagster

解决方案


在查看了您的错误的 dagster 代码库之后,我在这里找到了。它证实了我在教程中读到的“输出名称必须是唯一的”。

鉴于您在 for 循环中声明了 Output 并且收到了错误,您的 Output 对象名称很可能不是唯一的。


更新:从您通过打开问题对 dagster 进行的外展活动中,我测试了在运行时动态创建输出的想法,如果您在@solid. 我确实发现,当尝试在 a 中构建我的动态数据@solid以将其输出用作后继者的可靠配置输入时,后继@solid@solid没有选择更新的结构。结果是我收到了dagster.core.errors.DagsterInvariantViolationError

下面是我的代码,用于验证在实体之外执行动态数据生成时在运行时产生的动态输出。我猜这可能有点反模式,但如果 Dagster 还没有达到成熟水平来处理你提出的场景,可能还没有。另请注意,我没有处理的事情是对所有产生的输出对象做一些事情。

"""dagit -f dynamic_output_at_runtime.py -n dynamic_output_at_runtime"""
import random

from dagster import (
    Output,
    OutputDefinition,
    execute_pipeline,
    pipeline,
    solid,
    SystemComputeExecutionContext
)

# Create some dynamic OutputDefinition list for each execution
start = 1
stop = 100
limit = random.randint(1, 10)
random_set_of_ints = {random.randint(start, stop) for iter in range(limit)}
output_defs_runtime = [OutputDefinition(
    name=f'output_{num}') for num in random_set_of_ints]


@solid(output_defs=output_defs_runtime)
def ints_for_all(context: SystemComputeExecutionContext):
    for num in random_set_of_ints:
        out_name = f"output_{num}"
        context.log.info(f"output object name: {out_name}")
        yield Output(num, out_name)

@pipeline
def dynamic_output_at_runtime():
    x = ints_for_all()
    print(x)

if __name__ == '__main__':
    result = execute_pipeline(dynamic_output_at_runtime)
    assert result.success

我重新运行此管道的结果是每次不同的输出产量:

python dynamic_output_at_runtime.py 
_ints_for_all_outputs(output_56=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea160>, output_8=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea198>, output_58=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea1d0>, output_35=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea208>)
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_START - Started execution of pipeline "dynamic_output_at_runtime".
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Executing steps in process (pid: 9456)
 event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_START - Started execution of step "ints_for_all.compute".
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_56
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_56" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_56"], "type_check_data": [true, "output_56", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_8
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_8" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_8"], "type_check_data": [true, "output_8", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_58
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_58" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_58"], "type_check_data": [true, "output_58", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_35
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_35" of type "Any". (Type check passed).
 event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_35"], "type_check_data": [true, "output_35", null, []]}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_SUCCESS - Finished execution of step "ints_for_all.compute" in 2.17ms.
 event_specific_data = {"duration_ms": 2.166192003642209}
               solid = "ints_for_all"
    solid_definition = "ints_for_all"
            step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Finished steps in process (pid: 9456) in 3.11ms
 event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_SUCCESS - Finished execution of pipeline "dynamic_output_at_runtime".

我希望这有帮助!


推荐阅读