pyspark - PySpark解析结构的嵌套数组
问题描述
我想使用以下格式从 PySpark SQL 数据框中解析并获取特定键的值
我可以使用 UDF 来实现这一点,但处理 40 列 JSON 大小为 100MB 需要将近 20 分钟。也尝试过爆炸,但它为每个数组元素提供单独的行。但我只需要给定结构数组中键的特定值。
格式
array<struct<key:string,value:struct<int_value:string,string_value:string>>>
获取特定键值的函数
def getValueFunc(searcharray, searchkey):
for val in searcharray:
if val["key"] == searchkey:
if val["value"]["string_value"] is not None:
actual = val["value"]["string_value"]
return actual
elif val["value"]["int_value"] is not None:
actual = val["value"]["int_value"]
return str(actual)
else:
return "---"
.....
getValue = udf(getValueFunc, StringType())
....
# register the name rank udf template
spark.udf.register("getValue", getValue)
.....
df.select(getValue(col("event_params"), lit("category")).alias("event_category"))
解决方案
对于 Spark 2.40+,您可以使用 SparkSQL 的filter()函数查找第一个匹配的数组元素,key == serarchkey
然后检索其值。下面是一个 Spark SQL 片段模板(searchkey作为变量)来完成上面提到的第一部分。
stmt = '''filter(event_params, x -> x.key == "{}")[0]'''.format(searchkey)
使用函数运行上述stmtexpr()
,并将值(StructType)分配给临时列f1
,然后使用coalesce()
函数检索非空值。
from pyspark.sql.functions import expr
df.withColumn('f1', expr(stmt)) \
.selectExpr("coalesce(f1.value.string_value, string(f1.value.int_value),'---') AS event_category") \
.show()
如果您在运行上述代码时遇到任何问题,请告诉我。
推荐阅读
- kubernetes - Kubernetes 部署因 istio-sidecar 注入而失败
- python - 验证模拟类方法调用的类实例部分(自身)
- python - for循环中的垂直堆栈numpy数组
- python - 如何在 python 中跨多处理器共享内存?
- typescript - 如果当我把它变成一个函数并将它应用到 NestJS 中的 Swagger 响应时,该类不能正常工作怎么办?
- android - 适用于双 SIM 卡手机的 Android 订阅管理器
- ios - 如何提高 Lottie 加载性能(在集合堆栈视图中加载 23 个 Lottie)
- flutter - 在 Flutter 中使用 network_image_mock 包进行图像测试
- android - Nativescript捏手势发生抖动
- php - 使用 PHP 5.2.0 在 Windows Server 中安装 SQLite