首页 > 解决方案 > Spark:如何从数据框行转换具有多个键的 JSON 字符串?

问题描述

我正在寻求帮助,如何使用 json 结构的多个键解析 json 字符串,请参阅required output.

下面的答案显示了如何用一个转换 JSON 字符串Id

当每个字符串中每个 JSON 字符串的 Id 数量发生变化时,jstr1如何转换 , 中的数千个 Id 。jstr2

当前代码:

jstr1 = """
        {"id_1": [{"a": 1, "b": 2}, {"a": 3, "b": 4}], 
        "id_2": [{"a": 5, "b": 6}, {"a": 7, "b": 8}]}
              """
jstr2 = """
        {"id_3": [{"a": 9, "b": 10}, {"a": 11, "b": 12}], 
         "id_4": [{"a": 12, "b": 14}, {"a": 15, "b": 16}],
         "id_5": [{"a": 17, "b": 18}, {"a": 19, "b": 10}]}
          """

schema = "map<string, array<struct<a:int,b:int>>>"

df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)]) \
    .withColumn('json', F.from_json(F.col('json'), schema))

output = df.withColumn("id", F.map_keys("json").getItem(0)) \
            .withColumn("json", F.map_values("json").getItem(0))
output.show(truncate=False)

电流输出:

+-------------------+----+
|json               |id  |
+-------------------+----+
|[[1, 2], [3, 4]]   |id_1|
|[[9, 10], [11, 12]]|id_3|
+-------------------+----+

所需输出:

+---------------------+------+
|         json        |  id  |
+---------------------+------+
|[[[1, 2], [3, 4]]]   | id_1 |
|[[[5, 6], [7, 8]]]   | id_2 |
|[[[9,10], [11,12]]]  | id_3 |
|[[[13,14], [15,16]]] | id_4 |
|[[[17,18], [19,20]]] | id_5 |
+---------------------+------+

# NOTE: There is a large number of Ids in each JSON string
# so hard coded getItem(0), getItem(1) ... is not valid solution
                      ...
|[[[1000,1001], [10002,1003 ]]] | id_100000 |
+-------------------------------+-----------+ 

标签: pythonjsonapache-sparkpysparkapache-spark-sql

解决方案


地图列的一个explode将完成这项工作:

import pyspark.sql.functions as F

df.select(F.explode('json').alias('id', 'json')).show()
+----+--------------------+
|  id|                json|
+----+--------------------+
|id_1|    [[1, 2], [3, 4]]|
|id_2|    [[5, 6], [7, 8]]|
|id_3| [[9, 10], [11, 12]]|
|id_4|[[12, 14], [15, 16]]|
|id_5|[[17, 18], [19, 10]]|
+----+--------------------+

要在上一个问题中实现其他所需的输出,您可以再爆炸一次。这次你分解数组列,它来自地图的值。

df.select(
    F.explode('json').alias('id', 'json')
).select(
    'id', F.explode('json').alias('json')
).select(
    'id', 'json.*'
).show()
+----+---+---+
|  id|  a|  b|
+----+---+---+
|id_1|  1|  2|
|id_1|  3|  4|
|id_2|  5|  6|
|id_2|  7|  8|
|id_3|  9| 10|
|id_3| 11| 12|
|id_4| 12| 14|
|id_4| 15| 16|
|id_5| 17| 18|
|id_5| 19| 10|
+----+---+---+

推荐阅读