python - 如何将 dict 列从 pyspark DF 转换/分解为行
问题描述
我有具有以下结构的 json 文件:
{
"name": {
"0": "name1",
"1": "name2",
"2": "name3"
},
"id": {
"0": "00001",
"1": "00002",
"2": "00013"
}
}
当我读取此 json 文件以触发 DF(使用 python)时,我在每列收到带有字典的 DF:
schema = StructType([
StructField("name",StringType(),True),
StructField("id",StringType(),True)
])
spark_df = spark.read.schema(schema).json('path_to_json_file', multiLine=True)
spark_df.show()
+-------------------------------------+-------------------------------------+
| name | id |
+-------------------------------------+-------------------------------------+
|{"0":"name1","1":"name2","2":"name3"}|{"0":"00001","1":"00002","2":"00013"}|
+-------------------------------------+-------------------------------------+
如何分解每列以仅具有值:
+------+-----+
| name | id |
+------+-----+
|name1 |00001|
+------+-----+
|name2 |00002|
+------+-----+
|name3 |00013|
+------+-----+
我尝试使用explode
函数但收到错误:
from pyspark.sql import functions as f
spark_df.select('*', f.explode('id').alias('id')).show()
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve 'explode(`id`)' due to data type mismatch: input to function explode should be array or map type, not string;;\n'Project [name#860, id#861, explode(id#860) AS id#947]\n+- Relation[name#860,id#861] json\n"
我也尝试from_json
了函数,但为此我必须定义一个内部模式,这是我无法做到的,因为值的数量是未知的。我尝试了这个模式,但只收到空值。
schema = StructType([StructField('key1', StringType(), True)])
基本上我所知道的只是上面的键名(应该成为字段名),但我将获得的记录数是未知的。
解决方案
首先,您的输入模式是错误的。用一个改变它MapType
:
schm= T.StructType(
[
T.StructField("name", T.MapType(T.StringType(), T.StringType()), True),
T.StructField("id", T.MapType(T.StringType(), T.StringType()), True),
]
)
df = spark.read.schema(schm).json("path_to_json_file", multiLine=True)
df.printSchema()
root
|-- json: struct (nullable = true)
| |-- name: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
| |-- id: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
然后,假设两者name
和id
具有相同数量的输入:
df.withColumn("key", F.explode(F.map_keys("json.name"))).select(
F.col("json.name").getItem(F.col("key")).alias("name"),
F.col("json.id").getItem(F.col("key")).alias("id"),
).show()
+-----+-----+
| name| id|
+-----+-----+
|name1|00001|
|name2|00002|
|name3|00013|
+-----+-----+
推荐阅读
- css - 使用 css3 flex 当孩子少一个时如何自动留下子元素
- firebase - firebase 未在本机反应中定义
- image - 使用 imagemagick 从 YUV(UYVY) 转换为 RGB
- laravel - Laravel $request 值为空
- c++ - 如何将 C++ 项目从 32 位迁移到 64 位以支持 MAC OS 10.14(Mojave)?
- command - 如何在启动时在 Plesk 托管服务器上启动作业
- python - 如何使用 cacti 监控节点 - 使用脚本(shell ..)收集数据 没有 SNMP MIB
- java - SqlRowSet 返回 null,但为什么呢?
- mysql - 选择花费 20 秒?如果我删除订单将只需要 0,020
- javascript - 通过按钮调用的脚本调用表单