首页 > 解决方案 > Spark导入拆分JSON有效负载

问题描述

我有一个 json 结构,其中包含一些顶级元数据和一个有效负载,它等效于 pandas 定义为拆分方向 json 有效负载的内容。当我们摄取大量这些文件时,这样做是为了减少重复。

通常在 pandas 中,我会加载 json 文件并将所需的部分(索引、列名和数据)传递给 DataFrame 构造函数,这将为我提供一个易于使用的平面表,然后可以导出到 influxdb 或 sql。

obj = load_json('file.json')

df = pd.DataFrame(index=obj['payload']['Time'], columns=obj['Names']['Time'], data=obj['payload']['Data'])
df['Machine_ID'] = obj['Machine_ID']
df['TimeSend'] = obj['TimeSend']
df['Version'] = obj['Version']

似乎这个模式不容易用 spark 展平,因为数据不是基于记录的,所以列名和数据没有关联。无论如何我可以用火花将这个处理成平面模式,还是应该在我的管道中添加一个额外的熊猫处理步骤。

root
 |-- Machine_ID: string (nullable = true)
 |-- TimeSend: string (nullable = true)
 |-- Version: long (nullable = true)
 |-- Payload: struct (nullable = true)
 |    |-- Data: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |-- Names: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Time: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

编辑:我找到了一种可行的方法,但是我很好奇是否可以依赖排序,因为我在添加 id 之前拆分了数据框。可能最好拉上拉链TimeData让它们都能被炸开,然后从那里开始工作。

# Make flattened dataframe
df_ = df.select(col('Payload.Time').alias('Time'), col('Payload.Names').alias('Names'), col('Payload.Data').alias('Data'), col('Machine_ID'), col('TimeSend'), col('Version'))

# Make exploded `Data` table
columns = df_.rdd.flatMap(lambda x: x.Names).collect()
df_a = df_.select(explode(col('Data')))
df_a = df_a.select([df_a.col[x] for x in range(len(columns))])
df_a = rdd_data.toDF(columns)
df_a = df_a.withColumn("id", monotonically_increasing_id())

# Make exploded `Metadata` table
df_b = df_.select(explode(col('Time')), col('Machine_ID'), col('TimeSend'), col('Version'))
df_b = df_b.withColumn("id", monotonically_increasing_id())

# Join tables
df_c = df_a.join(df_b, "id")

# Schema is now flattened and joined
df_c.printSchema()
root
 |-- id: long (nullable = false)
 |-- Machine_ID: string (nullable = true)
 |-- TimeSend: string (nullable = true)
 |-- Version: long (nullable = true)
 |-- Index: string (nullable = true) <- From Payload.Time
 |-- TagA: double (nullable = true)  <- From Payload.Names & Data
 |-- TagB: double (nullable = true)  <- || -
 |-- TagC: double (nullable = true)  <- || -
 |-- TagD: double (nullable = true)  <- || -

标签: jsonpandaspysparkapache-spark-sql

解决方案


推荐阅读