首页 > 解决方案 > 如何从气流中 xcom_pull 一个 return_value 并作为字典传递?

问题描述

我有一个 json 配置文件,其中列出了要运行的潜在任务。从这个配置文件中,我想创建一个新配置,它只包含那些 run_date 与我的 DAG 执行日期匹配的任务。从那里,我希望 DAG 评估它是否需要运行任何东西,如果需要,特别是它需要运行什么。

我的配置看起来像:

config = {
    "task_1": {
        "run_date": "2021-05-01",
        "other_key": "other_value"
        },
    "task_2": {
        "run_date": "2021-05-02",
        "other_key": "other_value"
        }
    }

我的 dag 导入了上面的配置文件,看起来像:

dag = DAG(
    dag_id='test',
    default_args=args,
    catchup=False,
    schedule_interval='0 16 * * *'
)

tasks_to_run = BranchPythonOperator(
    task_id='tasks_to_run',
    python_callable=tasks_to_run,
    op_kwargs={'config': config, 'execution_date': "{{ ds }}"},
    dag=dag
)

new_config = PythonOperator(
    task_id='new_config',
    provide_context=True,
    python_callable=new_config,
    op_kwargs={'config': config, 'execution_date': "{{ ds }}"},
    dag=dag
)

next_task = PythonOperator(
    task_id='next_task',
    provide_context=True,
    python_callable=next_task,
    op_kwargs={'tasks': "{{ task_instance.xcom_pull(dag_id='test', task_ids='new_config', key='return_value') }}"}
)

'new_config' 生成新的配置文件,'next_task' 尝试提取 xcom 值。当 next_task 将 xcom return_value 传递给 python_callable 'next_task' 时,它会失败并显示:

TypeError:字符串索引必须是整数

如何将 xcom return_value 作为字典传递给 python callable 'next_task'?因为那是它所期望的。

标签: pythonairflow

解决方案


推荐阅读