首页 > 解决方案 > 在 Foundry 中,如何解析具有 JSON 响应的数据框列

问题描述

我正在尝试使用外部 API 将 JIRA 数据引入 Foundry。当它通过 Magritte 进入时,数据会存储在 AVRO 中,并且有一个名为 response 的列。响应列的数据如下所示...

[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]

由于这是作为 AVRO 导入的,因此有关如何转换 Foundry 中的这些数据的文档不起作用。如何将此数据转换为单独的列和行?

这是我尝试使用的代码:

from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T


@transform_df(
    Output("json output"),
    json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):

    sqlContext = SQLContext(sc)

    source = json_raw.select('response').collect()  # noqa

    # Read the list into data frame
    df = sqlContext.read.json(sc.parallelize(source))

    json_schema = T.StructType([
        T.StructField("id", T.StringType(), False),
        T.StructField("name", T.StringType(), False),
        T.StructField("custom", T.StringType(), False),
        T.StructField("orderable", T.StringType(), False),
        T.StructField("navigable", T.StringType(), False),
        T.StructField("searchable", T.StringType(), False),
        T.StructField("clauseNames", T.StringType(), False),
        T.StructField("schema", T.StringType(), False)
    ])

    udf_parse_json = udf(lambda str: parse_json(str), json_schema)

    df_new = df.select(udf_parse_json(df.response).alias("response"))

    return df_new


# Function to convert JSON array string to a list
def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

标签: palantir-foundry

解决方案


使用F.from_json函数可以轻松地将字符串列中的 Json 解析为结构列(然后解析为单独的列) 。

在您的情况下,您需要执行以下操作:

df = df.withColumn("response_parsed", F.from_json("response", json_schema))

然后您可以执行此操作或类似操作以将内容放入不同的列:

df = df.select("response_parsed.*")

但是,这不起作用,因为您的架构不正确,实际上每行中都有一个 json 结构列表,而不仅仅是 1,因此您需要T.ArrayType(your_schema)对整个事物进行包装,您还需要执行F.explode在选择之前,将每个数组元素放在自己的行中。

另一个有用的函数是F.get_json_object,它允许您从 json 字符串中获取 json 一个 json 对象。

像您所做的那样使用 UDF 是可行的,但 UDF 通常比原生 spark 函数的性能要低得多。

此外,在这种情况下,所有 AVRO 文件格式都是将多个 json 文件合并到一个大文件中,每个文件都在自己的行中,因此“Rest API 插件”-“在 Foundry 中处理 JSON”下的示例应该可以正常工作当您跳过“将此模式放在原始数据集上”步骤时。


推荐阅读