airflow - Airflow xcom 以列表格式返回一个字符串,而不仅仅是一个字符串值?
问题描述
我有一个气流运算符,它返回一个字符串值,任务名为“task1”。所以执行后我进入 xcom 并检查 return_value 和它只是一个字符串(下面的屏幕截图)。
然后我有一个名为 task2 的运算符从 task1 的 xcom 值中获取输入,如下所示:
"{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"
问题是它得到的值是一个转换为字符串的列表。
xcom 中的值:这只是一个字符串
xcom pull(jinga模板版)返回值: ['this is just a string']
那么有没有办法可以更新上面显示的 xcom pull (jinga 版本)来拉取值?我无法在运算符内部访问它被传递的内容,或者我可以放置一些逻辑将字符串转换为列表,然后仅获取值(但这并不理想,也不是一个选项)。
另外,我认为它的工作提到我尝试做类似的事情,但使用 Python 运算符然后使用 python 代码中的内容执行 xcom pull 并且返回值很好。所以我不确定为什么 xcom pull 使用 Jinja 模板会这样做,我该如何解决这个问题?我希望我可以做一些我不知道的事情来轻松获得我想要的输出。工作的python操作符代码如下(仅供参考......)
def python_code_task3(**context):
value = context['ti'].xcom_pull(task_ids='task1', key='return_value')
logging.info("Value: " + value)
而这段代码只是输出我想要的值这只是一个字符串
我真的只想使用jinga模板版本并让它检索并传入字符串。不是将字符串值作为列表中的一项的列表的字符串表示形式。
解决方案
XCom
您在代码片段中提取的两种方式之间存在细微差别:一种具有task_ids=["task_1"]
(a list arg)而另一种具有task_ids="task_1"
(a str arg)。
使用时的参数类型task_ids
很重要xcom_pull()
。Airflow 将推断,如果您传递一个任务 ID 列表,则应该有多个任务可以从中提取XComs
,并将返回一个包含所有检索到的XComs
. 否则,如果类型只是一个字符串,也就是单个任务 ID,XCom
则返回单个值。这是完成此操作的代码的链接。
还值得注意的是,Jinja 模板化的值默认呈现为字符串。render_template_as_native_obj
但是,使用 Airflow 2.1,您可以True
在 DAG 级别设置调用的参数。这将在适用时将 Jinja 模板化的值呈现为本机 Python 对象(列表、字典等)。更多关于这个概念的信息在这里。
推荐阅读
- typo3 - TYPO3:从 v8.7 更新到 TYPO3 v9.5.19 后,后端工具栏用户设置消失了
- c++ - 将重物插入 std::map
- excel - Yii2:Kartik GridView Excel 导出
- python - 按时间间隔对 Pandas 数据帧数据进行分组
- amazon-web-services - 将 S3 对象复制到另一个 S3 位置 Elastic Beanstalk SSH 设置错误
- performance - 如果 Page Speed Insights 说一件事而 Search Console 说另一件事,如何找到累积布局转换问题
- python - Discord.py-Rewrite 使用后台任务循环状态
- postgresql - 如何在下表的PostgreSQL中将行转换为列
- python - 模型(功能)不返回 EagerTensor
- css - 根据我调用它的位置更改组件的背景颜色