首页 > 解决方案 > pyspark:将结构分解成列

问题描述

我创建了一个 udf,它返回一个未嵌套的 StructType。只是一个带有字段名称的混合类型(int、float)的数组。我想将它们分解/拆分成单独的列。请注意,这将创建大约 50 个新列。通过谷歌搜索,我找到了这个解决方案:

df_split = df.select('ID', 'my_struct.*')

这行得通。但是性能绝对糟糕,例如。无法使用。检查集群节点,这也仅使用 1 个核心。但这只能解释问题的一小部分。

那么什么是实现我的目标的好方法,为什么上面的解决方案这么慢?

编辑:

似乎是 udf 和拆分的特定组合导致性能不佳。这很慢:

df_udf = df.withColumn('udf', my_udf(df.input))
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

这很快:

df_udf = df.withColumn('udf', my_udf(df.input))
df_udf.cache()
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

标签: pysparkuser-defined-functions

解决方案


根据要求,我正在编辑答案。请考虑到,这在我的情况下适用于一个小型测试集群(5 个节点),只有我使用相对较小的数据集(5000 万)处理它。

似乎是 udf 和 split 的特定组合导致性能不佳。这很慢:

df_udf = df.withColumn('udf', my_udf(df.input))
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

这很快:

df_udf = df.withColumn('udf', my_udf(df.input))
df_udf.cache()
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

推荐阅读