首页 > 解决方案 > 气流:在下一个任务中获取上一个任务ID

问题描述

我有 2 个任务。第一个,python 运算符计算一些东西,第二个我想在 Http 运算符中使用 python 运算符的输出。这是我的代码:

source_list = ['account', 'sales']

for source_type in source_list:
    t2 = PythonOperator(
                task_id='compute_next_gather_time_for_' + source_type,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

查询:我想在其数据变量中传递 t3 中以前的任务 ID。我不知道该怎么做,因为 t2 任务 ID 不是恒定的。它随着 source_type 的变化而变化。显然,当我尝试时它没有渲染它。

标签: airflow

解决方案


我之前没有在我的任何 DAG 中使用 Jinja 模板,但是我遇到过类似的问题,我需要从具有动态生成的 task_id 的特定任务中检索 XCOM 值。

您可以按照在 T2task_ids中定义的相同方式在 T3 中定义task_id。例如:

source_list = ['account', 'sales']

for source_type in source_list:

    task_id='compute_next_gather_time_for_' + source_type

    t2 = PythonOperator(
                task_id=task_id,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

推荐阅读