首页 > 解决方案 > 用另一个(布尔)数组列子集一个数组列

问题描述

我有一个这样的数据框(在 Pyspark 2.3.1 中):

from pyspark.sql import Row

my_data = spark.createDataFrame([
  Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
  Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a           |b           |mask                      |
#+------------+------------+--------------------------+
#|[9, 3, 4]   |[a, b, c]   |[true, false, false]      |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+

现在我想使用该mask列来对ab列进行子集化:

my_desired_output = spark.createDataFrame([
  Row(a=[9], b=['a']),
  Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a     |b     |
#+------+------+
#|[9]   |[a]   |
#|[7, 6]|[w, y]|
#+------+------+

实现这一目标的“惯用”方式是什么?我目前的解决方案涉及map对底层 RDD 进行处理并使用 Numpy 进行子集化,这似乎不太优雅:

import numpy as np

def subset_with_mask(row):
    mask = np.asarray(row.mask)
    a_masked = np.asarray(row.a)[mask].tolist()
    b_masked = np.asarray(row.b)[mask].tolist()
    return Row(a=a_masked, b=b_masked)

my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))

这是最好的方法,还是我可以使用 Spark SQL 工具做一些更好的事情(不那么冗长和/或更高效)?

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


一种选择是使用 UDF,您可以选择根据数组中的数据类型对其进行专门化:

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T

def _mask_list(lst, mask):
    return np.asarray(lst)[mask].tolist()

mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))

my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
    'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
    'b', mask_array_str(F.col('b'), F.col('mask'))
)

推荐阅读