palantir-foundry - 在 Foundry 中,如何解析具有 JSON 响应的数据框列
问题描述
我正在尝试使用外部 API 将 JIRA 数据引入 Foundry。当它通过 Magritte 进入时,数据会存储在 AVRO 中,并且有一个名为 response 的列。响应列的数据如下所示...
[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]
由于这是作为 AVRO 导入的,因此有关如何转换 Foundry 中的这些数据的文档不起作用。如何将此数据转换为单独的列和行?
这是我尝试使用的代码:
from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T
@transform_df(
Output("json output"),
json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):
sqlContext = SQLContext(sc)
source = json_raw.select('response').collect() # noqa
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
json_schema = T.StructType([
T.StructField("id", T.StringType(), False),
T.StructField("name", T.StringType(), False),
T.StructField("custom", T.StringType(), False),
T.StructField("orderable", T.StringType(), False),
T.StructField("navigable", T.StringType(), False),
T.StructField("searchable", T.StringType(), False),
T.StructField("clauseNames", T.StringType(), False),
T.StructField("schema", T.StringType(), False)
])
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
df_new = df.select(udf_parse_json(df.response).alias("response"))
return df_new
# Function to convert JSON array string to a list
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
解决方案
使用F.from_json函数可以轻松地将字符串列中的 Json 解析为结构列(然后解析为单独的列) 。
在您的情况下,您需要执行以下操作:
df = df.withColumn("response_parsed", F.from_json("response", json_schema))
然后您可以执行此操作或类似操作以将内容放入不同的列:
df = df.select("response_parsed.*")
但是,这不起作用,因为您的架构不正确,实际上每行中都有一个 json 结构列表,而不仅仅是 1,因此您需要T.ArrayType(your_schema)
对整个事物进行包装,您还需要执行F.explode在选择之前,将每个数组元素放在自己的行中。
另一个有用的函数是F.get_json_object,它允许您从 json 字符串中获取 json 一个 json 对象。
像您所做的那样使用 UDF 是可行的,但 UDF 通常比原生 spark 函数的性能要低得多。
此外,在这种情况下,所有 AVRO 文件格式都是将多个 json 文件合并到一个大文件中,每个文件都在自己的行中,因此“Rest API 插件”-“在 Foundry 中处理 JSON”下的示例应该可以正常工作当您跳过“将此模式放在原始数据集上”步骤时。
推荐阅读
- django - Djangop Heroku,发送电子邮件时,我们收到错误500,如何解决?
- c# - Unity - 如何从场景 1 传输对象并在场景 2 中实例化它?
- reactjs - 使用 redux 响应身份验证
- discord.py-rewrite - 如何获得具有特定权限的所有角色
- python - 如何使用带有 Python Sagemaker SDK 的 TensorFlow 估计器指定最大运行时间?
- mysql - 主键和规范化
- c# - 将文本框绑定到 List<>[i]
- php - Wordpress PHP 7.0 到 7.3 在 gcp 上的 Wordpress
- nginx - SAMEORIGIN VS CORS
- php - 将重力形式钩子变量传递给另一个重力形式钩子