python - 从 PySpark 数据帧(结构化流数据)中的嵌套结构中分解任意数量的 JSON 字段
问题描述
我正在通过 IoT 和 API 设备在 Databricks 结构化流环境中处理一些流数据。
我的架构如下:
root
|-- body: string (nullable = true)
|-- partition: string (nullable = true)
|-- offset: string (nullable = true)
|-- idNumber: long (nullable = true)
|-- publishTime: timestamp (nullable = true)
|-- make: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- properties_sys: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- date: date (nullable = true)
我的正文列包含嵌套的 json,可以是一到两层深
IE
data = { 'id': {['device_1' : ...,
'device_2' : ...]..}..}
或者
data = {'id' : 1,
'temp' : 28}
我有一些函数可以帮助我推导出进入我的 databricks 环境的每个唯一流的 strucTypes 应该是什么,但这是基于我需要事先在配置文件中定义的静态列表。
如果我们想象这个物联网设备在运行我的函数后测量天气,我会得到(这是body
专栏的结构)
schema = [StructField(humidity,FloatType,true),
StructField(rainfall,LongType,true),
StructField(timestamp,DateType,true),
StructField(city,StringType,true)]
现在要分解嵌套列,我只需手动使用以下代码行:
parsed_df = df.withColumn("body", from_json("body", schema)).select(col('partition'),
col('offset'), col('idNumber'),col('publishTime'),col('make'),col('partitionKey'),
col('properties'),col('properties_sys'),col('date'),col('body.*'))
这完美地解除了我的 json 嵌套,但是由于我的 IoT 设备与一些 API 数据混合,我可能会在我的body
列中获得一个没有的新字段,StructType
因此我无法事先手动定义模式。
所以我的问题是,有没有一种方法来定义动态explode
或任何人用来创建动态模式的方法,我可以事先存储/使用?
解决方案
推荐阅读
- javascript - “Uncaught SyntaxError: Unexpected token l in JSON at position 0”,但仍然有效,为什么?
- rest - 使用像 Nuxt 这样的服务器端渲染来保护 API
- typescript - React Native useState 和 useEffect 交互
- spring - 收件人地址
不是 553 5.1.3 有效的 RFC-5321 地址 - html - Html 必需属性未使用 html 选择标记触发
- python - Scrapy 项目更改列名
- c# - 我可以通过 c# 将数据上传到 firebase 吗?
- python - Python; 使用可变文件名在远程服务器上创建文件
- image - 如何在 Flutter 应用中使用 MQTT 协议下载从另一台设备发送的图像?
- amazon-s3 - 使用 Azure 数据工厂将大量文件 (400k) 从 S3 存储桶下载到 Azure Datalake Gen2