python-3.x - 从 xcom 气流任务实例数据创建 json
问题描述
我在气流 dag 中有两个任务。第一个任务点击 POST URL,它的结果成为第二个任务的参数。我可以点击第一个 URL 并将其发送到另一个任务。问题是,我无法从第二个中创建正确的 json。
def create_dag(dag_id,
schedule,
default_args):
def task1(**kwargs):
res = requests.post('some url')
return res
def task2(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='taks1')
for rec in v1:
decode = rec.decode("utf-8")
logger.error(decode)
第一个任务返回 json:
{
"data": [
{
"id": 36,
"idPercent": 12.67605633802817,
"idPerson": [
"Washburn"
]
},
{
"id": 37,
"idPercent": 13.028169014084508,
"idPerson": [
"Nicole"
]
}
]
}
和 rec.decode("utf-8") 打印我一样。如果我在 rec.decode("utf-8") 上执行 json.loads,我就会开始出错。我想创建正确的 json,然后为数据的每个数组值发送一个 POST 请求。
我想要类似的东西:
for rec in v1['data']:
requests.post(url, rec)
但我无法创建 json 并从中提取数据。
解决方案
在 Airflow 中,xcom 是作为字符串传递的,在您的情况下v1
将是一个字符串,即使它是从 xcom 对象加载的。解决此问题的一种方法是使用ast
将字符串值转换为“正确”类型的模块。请注意,有时由于标点符号或类似符号,它很难解释正确的类型。
import ast
v1 = ast.literal_eval(ti.xcom_pull(key=None, task_ids='taks1'))
推荐阅读
- html - 调整窗口大小导致文本下推图像
- api - 来自 WebApp 的 Hyperledger Composer Rest 服务器查询
- java - 如果我们在指定时间内没有得到响应,如何关闭 API 连接
- elasticsearch - 在 elasticsearch 中哪个更好的滚动或 search_after 来模拟随机分页?
- c# - 路由到控制器中的类对象
- iterator - 如何通过小数据集进行采样以进行比数据大小更多的迭代?
- apache-spark-sql - 如何使用 Spark 中的 scala 将 RDD[DataFrame] 中的所有 DataFrame 合并到 DataFrame 而不使用 for 循环?
- python-3.x - ImportError:没有名为 xlrd 的模块
- python - django单页上的两部分表单
- java - 嵌套 if 或 code 多次?