首页 > 解决方案 > 气流如何将 xcom 结果传递给 SubDagOperator,它将确定 subdag 中 BashOperator 的数量?

问题描述

你能帮我解决这个问题吗?我有以下代码:我之前定义了:DAG_NAME、SUBDAG_ID、default_args。我想知道我是否缺少任何参数,例如 provide_context 或其他参数,或者是否支持/不支持。

p_dag = DAG(dag_id=DAG_NAME,
          default_args=default_args,
          schedule_interval='@once',
          provide_context=True)

step1 = BashOperator(
    task_id='xcom_feed',
    bash_command=COMMAND,
    dag=p_dag,
    xcom_push=True)

def dag_subdag(parent_dag=DAG_NAME, child_dag=SUBDAG_ID,
               args=default_args, **kwargs):
    sdag = DAG(dag_id=f"{parent_dag}.{child_dag}", default_args=args,
               schedule_interval="@once", provide_context=True)
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='xcom_feed')
    value = int(literal_eval(value)[0])
    for i in range(value):
        BashOperator(task_id=f"{child_dag}-task-{i}",
                     bash_command=COMMAND_ON_{i},
                     dag=sdag)
    return sdag

step2 = SubDagOperator(task_id=SUBDAG_ID,
       subdag=dag_subdag(DAG_NAME, SUBDAG_ID, default_args),
       dag=p_dag)

step1 >> step2

可悲的是,我将其作为 PythonOperator 运行,并在定义 subdag bash 运算符之前打印了 dag 和 sdag 的类型,并且它按预期执行而没有错误,告诉我它返回一个 DAG,但是当我运行这个时,它在我身上爆炸了:错误的最后一部分是

/airflow/models/dag.py", line 1538, in create_dagrun
    return self.get_dag().create_dagrun(run_id=run_id,
AttributeError: 'NoneType' object has no attribute 'create_dagrun'

标签: airflow

解决方案


推荐阅读