首页 > 解决方案 > 从 PySpark 数据帧(结构化流数据)中的嵌套结构中分解任意数量的 JSON 字段

问题描述

我正在通过 IoT 和 API 设备在 Databricks 结构化流环境中处理一些流数据。

我的架构如下:

    root
 |-- body: string (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- idNumber: long (nullable = true)
 |-- publishTime: timestamp (nullable = true)
 |-- make: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- properties_sys: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- date: date (nullable = true)

我的正文列包含嵌套的 json,可以是一到两层深

IE

data = { 'id': {['device_1' : ...,
                'device_2' : ...]..}..}

或者

data = {'id' : 1,
         'temp' : 28}

我有一些函数可以帮助我推导出进入我的 databricks 环境的每个唯一流的 strucTypes 应该是什么,但这是基于我需要事先在配置文件中定义的静态列表。

如果我们想象这个物联网设备在运行我的函数后测量天气,我会得到(这是body专栏的结构)

 schema = [StructField(humidity,FloatType,true),
 StructField(rainfall,LongType,true),
 StructField(timestamp,DateType,true),
 StructField(city,StringType,true)]

现在要分解嵌套列,我只需手动使用以下代码行:

parsed_df = df.withColumn("body", from_json("body", schema)).select(col('partition'), 
col('offset'), col('idNumber'),col('publishTime'),col('make'),col('partitionKey'),
col('properties'),col('properties_sys'),col('date'),col('body.*'))

这完美地解除了我的 json 嵌套,但是由于我的 IoT 设备与一些 API 数据混合,我可能会在我的body列中获得一个没有的新字段,StructType因此我无法事先手动定义模式。

所以我的问题是,有没有一种方法来定义动态explode或任何人用来创建动态模式的方法,我可以事先存储/使用?

标签: pythonpysparkdatabricks

解决方案


推荐阅读