python - Pyspark 将嵌套结构字段转换为 Json 字符串
问题描述
我正在尝试使用 pyspark 将一些 mongo 集合摄取到大查询中。架构看起来像这样。
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- my_field: struct (nullable = true)
| | | |-- **{ mongo id }**: struct (nullable = true)
| | | | |-- A: timestamp (nullable = true)
| | | | |-- B: string (nullable = true)
| | | | |-- C: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
问题是在 my_field 中我们存储了 id,每个组都有自己的 id,当我将所有内容导入到大查询中时,我最终会为每个 id 生成一个新列。我想将 my_field 转换为字符串并将所有嵌套字段存储为 json 或类似的东西。但是当我尝试转换它时,我收到了这个错误
temp_df = temp_df.withColumn("groups.my_field", col("groups.my_field").cast('string'))
TypeError: Column is not iterable
我错过了什么?
解决方案
所以事实证明,为了追加/删除/重命名嵌套字段,您需要更改架构。我不知道。所以这是我的答案。我从这里复制并修改了代码https://stackoverflow.com/a/48906217/984114以使其适用于我的架构
这是“exclude_nested_field”的修改版本
def change_nested_field_type(schema, fields_to_change, parent=""):
new_schema = []
if isinstance(schema, StringType):
return schema
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in fields_to_change:
if isinstance(field.dataType, StructType):
inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
new_schema.append(StructField(field.name, ArrayType(inner_schema)))
else:
new_schema.append(StructField(field.name, field.dataType))
else:
# Here we change the field type to String
new_schema.append(StructField(field.name, StringType()))
return StructType(new_schema)
这就是我调用函数的方式
new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["my_field"]))
df = df.withColumn("json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("json", new_schema)).drop("json")
推荐阅读
- struct - 有没有办法根据特征实现创建结构?
- angular - 具有通用组件的角度列表作为列表项
- javascript - 从数组中排除重复项
- swagger - Spring Boot 中的 Swaggerhub 集成
- javascript - 事件发生变化
- python - log2元素的两个numpy矩阵之间的矩阵乘法方法
- angular - 当下方空间不足时,Mat-menu 不会自动更改位置
- python - Pandas DataFrame 中是否有比我正在做的更有效的转置?
- flutter - 如何使用 Dio 将多个图像上传到服务器?
- python - Tkinter 之间的区别
和 < >