首页 > 解决方案 > Airflow xcom 以列表格式返回一个字符串,而不仅仅是一个字符串值?

问题描述

我有一个气流运算符,它返回一个字符串值,任务名为“task1”。所以执行后我进入 xcom 并检查 return_value 和它只是一个字符串(下面的屏幕截图)。

xcom 键/值截图

然后我有一个名为 task2 的运算符从 task1 的 xcom 值中获取输入,如下所示:

"{{ ti.xcom_pull(task_ids=['task1'],key='return_value')}}"

问题是它得到的值是一个转换为字符串的列表。

xcom 中的值:这只是一个字符串

xcom pull(jinga模板版)返回值: ['this is just a string']

那么有没有办法可以更新上面显示的 xcom pull (jinga 版本)来拉取值?我无法在运算符内部访问它被传递的内容,或者我可以放置一些逻辑将字符串转换为列表,然后仅获取值(但这并不理想,也不是一个选项)。

另外,我认为它的工作提到我尝试做类似的事情,但使用 Python 运算符然后使用 python 代码中的内容执行 xcom pull 并且返回值很好。所以我不确定为什么 xcom pull 使用 Jinja 模板会这样做,我该如何解决这个问题?我希望我可以做一些我不知道的事情来轻松获得我想要的输出。工作的python操作符代码如下(仅供参考......)

def python_code_task3(**context): 
value = context['ti'].xcom_pull(task_ids='task1', key='return_value') 
logging.info("Value: " + value)

而这段代码只是输出我想要的值这只是一个字符串

我真的只想使用jinga模板版本并让它检索并传入字符串。不是将字符串值作为列表中的一项的列表的字符串表示形式。

标签: airflow

解决方案


XCom您在代码片段中提取的两种方式之间存在细微差别:一种具有task_ids=["task_1"](a list arg)而另一种具有task_ids="task_1"(a str arg)。

使用时的参数类型task_ids很重要xcom_pull()。Airflow 将推断,如果您传递一个任务 ID 列表,则应该有多个任务可以从中提取XComs,并将返回一个包含所有检索到的XComs. 否则,如果类型只是一个字符串,也就是单个任务 ID,XCom则返回单个值。这是完成此操作的代码的链接

还值得注意的是,Jinja 模板化的值默认呈现为字符串。render_template_as_native_obj但是,使用 Airflow 2.1,您可以True在 DAG 级别设置调用的参数。这将在适用时将 Jinja 模板化的值呈现为本机 Python 对象(列表、字典等)。更多关于这个概念的信息在这里


推荐阅读