首页 > 解决方案 > PySpark解析结构的嵌套数组

问题描述

我想使用以下格式从 PySpark SQL 数据框中解析并获取特定键的值

我可以使用 UDF 来实现这一点,但处理 40 列 JSON 大小为 100MB 需要将近 20 分钟。也尝试过爆炸,但它为每个数组元素提供单独的行。但我只需要给定结构数组中键的特定值。

格式

array<struct<key:string,value:struct<int_value:string,string_value:string>>>

获取特定键值的函数

def getValueFunc(searcharray, searchkey):
    for val in searcharray:
        if val["key"] == searchkey:
            if val["value"]["string_value"] is not None:
                actual = val["value"]["string_value"]
                return actual
            elif val["value"]["int_value"] is not None:
                actual = val["value"]["int_value"]
                return str(actual)
            else:
                return "---"

.....
getValue = udf(getValueFunc, StringType())
....
# register the name rank udf template
spark.udf.register("getValue", getValue)
.....
df.select(getValue(col("event_params"), lit("category")).alias("event_category"))

标签: pysparkpyspark-sql

解决方案


对于 Spark 2.40+,您可以使用 SparkSQL 的filter()函数查找第一个匹配的数组元素,key == serarchkey然后检索其值。下面是一个 Spark SQL 片段模板(searchkey作为变量)来完成上面提到的第一部分。

stmt = '''filter(event_params, x -> x.key == "{}")[0]'''.format(searchkey)

使用函数运行上述stmtexpr(),并将值(StructType)分配给临时列f1,然后使用coalesce()函数检索非空值。

from pyspark.sql.functions import expr

df.withColumn('f1', expr(stmt)) \
    .selectExpr("coalesce(f1.value.string_value, string(f1.value.int_value),'---') AS event_category") \
    .show()

如果您在运行上述代码时遇到任何问题,请告诉我。


推荐阅读