python - 如何从气流中 xcom_pull 一个 return_value 并作为字典传递?
问题描述
我有一个 json 配置文件,其中列出了要运行的潜在任务。从这个配置文件中,我想创建一个新配置,它只包含那些 run_date 与我的 DAG 执行日期匹配的任务。从那里,我希望 DAG 评估它是否需要运行任何东西,如果需要,特别是它需要运行什么。
我的配置看起来像:
config = {
"task_1": {
"run_date": "2021-05-01",
"other_key": "other_value"
},
"task_2": {
"run_date": "2021-05-02",
"other_key": "other_value"
}
}
我的 dag 导入了上面的配置文件,看起来像:
dag = DAG(
dag_id='test',
default_args=args,
catchup=False,
schedule_interval='0 16 * * *'
)
tasks_to_run = BranchPythonOperator(
task_id='tasks_to_run',
python_callable=tasks_to_run,
op_kwargs={'config': config, 'execution_date': "{{ ds }}"},
dag=dag
)
new_config = PythonOperator(
task_id='new_config',
provide_context=True,
python_callable=new_config,
op_kwargs={'config': config, 'execution_date': "{{ ds }}"},
dag=dag
)
next_task = PythonOperator(
task_id='next_task',
provide_context=True,
python_callable=next_task,
op_kwargs={'tasks': "{{ task_instance.xcom_pull(dag_id='test', task_ids='new_config', key='return_value') }}"}
)
'new_config' 生成新的配置文件,'next_task' 尝试提取 xcom 值。当 next_task 将 xcom return_value 传递给 python_callable 'next_task' 时,它会失败并显示:
TypeError:字符串索引必须是整数
如何将 xcom return_value 作为字典传递给 python callable 'next_task'?因为那是它所期望的。
解决方案
推荐阅读
- node.js - 如何处理分布在 NodeJS 中不同文件中的验证脚本?
- ruby-on-rails - 在 Rails 和 Sinatra 应用程序之间共享 Gem
- sql - SQL Server 查询以匹配确切的单词
- cookies - 在 SPA 上存储身份验证令牌的最佳方式
- java - 从声纳的“新代码覆盖率”中排除 javascript 文件
- mysql - 计算排名不起作用 - Mysql
- angularjs - 从 web api 控制器将 xml 传递给 angular
- python - 需要不和谐机器人的帮助
- python - 正则表达式匹配下一行中的单词
- python - Python Sum if:使用来自不同 pandas 列的数据