airflow - 如何从同一 DAG 运行中的其他任务实例中提取 xcom 值(不是最新的)?
问题描述
我有 3 次 DAG 运行:
- DAGR 1 于 2019-02-13 16:00:00 执行
- DAGR 2 于 2019-02-13 17:00:00 执行
- DAGR 3 于 2019-02-13 18:00:00 执行
在我想获取任务实例X
的xcom 值的任务实例中。我这样做了:DAGR 1
Y
kwargs['task_instance'].xcom_pull(task_ids='Y')
我希望Y
从DAGR 1
. 相反,我从DAGR 3
.
来自气流文档
如果
xcom_pull
为 传递单个字符串task_ids
,则返回该任务的最新 XCom 值;...
- 为什么 Airflow
xcom_pull
返回最新的 xcom 值? - 如果我想从同一个 DAG 运行中提取怎么办?
解决方案
这回答了您的问题[如何在同一个 DAG 运行(不是最新的)中从其他任务实例中提取 xcom 值?]
请参见下面的示例:
t1 = SomeOperator(
task_id='Your_t1_Task_ID',
xcom_push = True,
...
...
dag=dag)
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
string_to_print = 'Value in xcom is: {}'.format(xcom)
#string_to_print holds that value, you can also print it in the logs
logging.info(string_to_print)
t2 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_records,
dag=dag)
t1 >> t2
推荐阅读
- javascript - REST API 一次只显示一个对象
- git - 为 BitBucket 上的一位特定用户授予对一个存储库的访问权限
- c - Valgrind“大小为 1 的无效写入”
- javascript - 验证滚动时满足条件,然后只运行一次函数
- laravel - 如何通过 Laravel 5.8 中的 Auth:: 类从用户的依赖表中检索字段?
- python - "查询集没有属性
" 但我没有进行任何字段查找 - ios - IOS TVML 应用程序中的搜索功能
- javascript - 复制一个 div 的大小尺寸并将其应用于另一个 div。它添加了错误的尺寸吗?
- clion - 如何查看编译器输出?
- swift - 为什么navigationBar隐藏collectionView?