首页 > 解决方案 > 从 xcom 气流任务实例数据创建 json

问题描述

我在气流 dag 中有两个任务。第一个任务点击 POST URL,它的结果成为第二个任务的参数。我可以点击第一个 URL 并将其发送到另一个任务。问题是,我无法从第二个中创建正确的 json。


def create_dag(dag_id,
               schedule,
               default_args):
    def task1(**kwargs):
        res = requests.post('some url')
        return res

    def task2(**kwargs):
        ti = kwargs['ti']
        v1 = ti.xcom_pull(key=None, task_ids='taks1')
        for rec in v1:
            decode = rec.decode("utf-8")
            logger.error(decode)

第一个任务返回 json:

{
    "data": [
        {
            "id": 36,
            "idPercent": 12.67605633802817,
            "idPerson": [
                "Washburn"
            ]
        },
        {
            "id": 37,
            "idPercent": 13.028169014084508,
            "idPerson": [
                "Nicole"
            ]
        }
    ]
}

和 rec.decode("utf-8") 打印我一样。如果我在 rec.decode("utf-8") 上执行 json.loads,我就会开始出错。我想创建正确的 json,然后为数据的每个数组值发送一个 POST 请求。

我想要类似的东西:

for rec in v1['data']:
     requests.post(url, rec)

但我无法创建 json 并从中提取数据。

标签: python-3.xairflow

解决方案


在 Airflow 中,xcom 是作为字符串传递的,在您的情况下v1将是一个字符串,即使它是从 xcom 对象加载的。解决此问题的一种方法是使用ast将字符串值转换为“正确”类型的模块。请注意,有时由于标点符号或类似符号,它很难解释正确的类型。

import ast

v1 = ast.literal_eval(ti.xcom_pull(key=None, task_ids='taks1'))

推荐阅读