airflow - 气流:在下一个任务中获取上一个任务ID
问题描述
我有 2 个任务。第一个,python 运算符计算一些东西,第二个我想在 Http 运算符中使用 python 运算符的输出。这是我的代码:
source_list = ['account', 'sales']
for source_type in source_list:
t2 = PythonOperator(
task_id='compute_next_gather_time_for_' + source_type,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
查询:我想在其数据变量中传递 t3 中以前的任务 ID。我不知道该怎么做,因为 t2 任务 ID 不是恒定的。它随着 source_type 的变化而变化。显然,当我尝试时它没有渲染它。
解决方案
我之前没有在我的任何 DAG 中使用 Jinja 模板,但是我遇到过类似的问题,我需要从具有动态生成的 task_id 的特定任务中检索 XCOM 值。
您可以按照在 T2task_ids
中定义的相同方式在 T3 中定义task_id
。例如:
source_list = ['account', 'sales']
for source_type in source_list:
task_id='compute_next_gather_time_for_' + source_type
t2 = PythonOperator(
task_id=task_id,
python_callable=compute_next_gather_time,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
op_args=[source_type],
retries=3
)
t3 = SimpleHttpOperator(
task_id='request_' + source_type + '_report',
method='POST',
http_conn_id='abc',
endpoint=endpoint,
data=json.dumps({
"query": {
"start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
"stop": str(yesterday),
"fields": [
1
]
}
}),
headers={"Content-Type": "application/json", "Authorization": 'abc'},
response_check=lambda response: True if len(response.json()) == 0 else False,
log_response=True,
retries=3
)
推荐阅读
- firebase - 是否可以在 Firebase 云消息传递中删除或更新推送通知?
- java - 带领吃豆人到点
- mysql - 在两个表之间选择具有最大日期的行
- python - Pycryptodome AES_OCB 加密随机数如何添加随机数?
- c - c中结构的动态数组(malloc,realloc)
- svg - SVG 矩形填充问题
- mysql - MySQL UDF 仅适用于 `IF` 中的 `WHERE` 子句
- javascript - 如何限制每个用户的 API 调用?漏桶算法
- java - 如何检查流程中的交易正在运行什么合约?
- javascript - Jest 在我的测试中找不到导入的组件