json - Spark导入拆分JSON有效负载
问题描述
我有一个 json 结构,其中包含一些顶级元数据和一个有效负载,它等效于 pandas 定义为拆分方向 json 有效负载的内容。当我们摄取大量这些文件时,这样做是为了减少重复。
通常在 pandas 中,我会加载 json 文件并将所需的部分(索引、列名和数据)传递给 DataFrame 构造函数,这将为我提供一个易于使用的平面表,然后可以导出到 influxdb 或 sql。
obj = load_json('file.json')
df = pd.DataFrame(index=obj['payload']['Time'], columns=obj['Names']['Time'], data=obj['payload']['Data'])
df['Machine_ID'] = obj['Machine_ID']
df['TimeSend'] = obj['TimeSend']
df['Version'] = obj['Version']
似乎这个模式不容易用 spark 展平,因为数据不是基于记录的,所以列名和数据没有关联。无论如何我可以用火花将这个处理成平面模式,还是应该在我的管道中添加一个额外的熊猫处理步骤。
root
|-- Machine_ID: string (nullable = true)
|-- TimeSend: string (nullable = true)
|-- Version: long (nullable = true)
|-- Payload: struct (nullable = true)
| |-- Data: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: double (containsNull = true)
| |-- Names: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- Time: array (nullable = true)
| | |-- element: string (containsNull = true)
编辑:我找到了一种可行的方法,但是我很好奇是否可以依赖排序,因为我在添加 id 之前拆分了数据框。可能最好拉上拉链Time
,Data
让它们都能被炸开,然后从那里开始工作。
# Make flattened dataframe
df_ = df.select(col('Payload.Time').alias('Time'), col('Payload.Names').alias('Names'), col('Payload.Data').alias('Data'), col('Machine_ID'), col('TimeSend'), col('Version'))
# Make exploded `Data` table
columns = df_.rdd.flatMap(lambda x: x.Names).collect()
df_a = df_.select(explode(col('Data')))
df_a = df_a.select([df_a.col[x] for x in range(len(columns))])
df_a = rdd_data.toDF(columns)
df_a = df_a.withColumn("id", monotonically_increasing_id())
# Make exploded `Metadata` table
df_b = df_.select(explode(col('Time')), col('Machine_ID'), col('TimeSend'), col('Version'))
df_b = df_b.withColumn("id", monotonically_increasing_id())
# Join tables
df_c = df_a.join(df_b, "id")
# Schema is now flattened and joined
df_c.printSchema()
root
|-- id: long (nullable = false)
|-- Machine_ID: string (nullable = true)
|-- TimeSend: string (nullable = true)
|-- Version: long (nullable = true)
|-- Index: string (nullable = true) <- From Payload.Time
|-- TagA: double (nullable = true) <- From Payload.Names & Data
|-- TagB: double (nullable = true) <- || -
|-- TagC: double (nullable = true) <- || -
|-- TagD: double (nullable = true) <- || -
解决方案
推荐阅读
- javascript - 遇到未捕获的类型错误:无法读取未定义的属性“设置”
- java - java mapstruct 1.3.1忽略双向DTO映射列表中的属性
- r - 在R中制作一个连续数字列表
- javascript - 复选框单击清除 contenteditable div 选择
- forms - 如何在 Contact Form 7 中删除已发送消息的边框,并使整个消息在 5 秒后消失?
- python - 在 SQLAlchemy 中创建动态类
- powershell - PowerShell Invoke-WebRequest 链接问题
- function - plpgsql 函数错误:提示:没有函数匹配给定的名称和参数类型。您可能需要添加显式类型转换
- security - NIFI 认证
- puppeteer - 每个选择器类型的性能如何?