首页 > 解决方案 > 如何从同一 DAG 运行中的其他任务实例中提取 xcom 值(不是最新的)?

问题描述

我有 3 次 DAG 运行:

  1. DAGR 1 于 2019-02-13 16:00:00 执行
  2. DAGR 2 于 2019-02-13 17:00:00 执行
  3. DAGR 3 于 2019-02-13 18:00:00 执行

在我想获取任务实例X的xcom 值的任务实例中。我这样做了:DAGR 1Y

kwargs['task_instance'].xcom_pull(task_ids='Y')

我希望YDAGR 1. 相反,我从DAGR 3.

来自气流文档

如果xcom_pull为 传递单个字符串task_ids,则返回该任务的最新 XCom 值;...

  1. 为什么 Airflowxcom_pull返回最新的 xcom 值?
  2. 如果我想从同一个 DAG 运行中提取怎么办?

标签: airflow

解决方案


这回答了您的问题[如何在同一个 DAG 运行(不是最新的)中从其他任务实例中提取 xcom 值?]
请参见下面的示例

t1 = SomeOperator(
        task_id='Your_t1_Task_ID',
        xcom_push = True,
        ...
        ...
        dag=dag)

    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
        string_to_print = 'Value in xcom is: {}'.format(xcom)
        #string_to_print holds that value, you can also print it in the logs
        logging.info(string_to_print)

    t2 = PythonOperator(
        task_id='records',
        provide_context=True,
        python_callable=get_records,
        dag=dag)

    t1 >> t2

推荐阅读