首页 > 解决方案 > 如何在 PySpark 中查找非空值的列集合

问题描述

我有一个带有 n 列(Column_1,Column_2 ...... Column_n)的 Pyspark 数据框。我必须再添加一列,其中以逗号分隔的列集合。

条件:如果两个或多个列有值填充集合列中的逗号分隔值,例如。下面是三列的数据。

----------------------------------------------------------------------
| column_1  | column_2 | column_3 |             col collections      |
----------------------------------------------------------------------
|     -     |     -    |     -    |                  -               |
------------------------------------------ ---------------------------
|     1     |     -    |     -    |                  -               |
------------------------------------------ ---------------------------
|     -     |     1    |     -    |                  -               |
------------------------------------------ ---------------------------
|     -     |     -    |     1    |                  -               |
------------------------------------------ ---------------------------
|     1     |     1    |     -    | column_1,column_2                |
----------------------------------------------------------------------
|     1     |     1    |     1    | column_1,column_2,column_3       |
----------------------------------------------------------------------
|     1     |     -    |     -    |                      -           |
----------------------------------------------------------------------
|     -     |     1    |     1    | column_2,column_3                |
----------------------------------------------------------------------

标签: dataframeapache-sparkpysparkapache-spark-sql

解决方案


这是一种解决方案。

import pandas as pd
from pyspark.sql.functions import concat_ws, udf
from pyspark.sql.types import StringType

pandas_df = pd.DataFrame({
    'column_1': [None, '1', None, None, '1', '1', '1'],
    'column_2': [None, None, '1', None, '1', '1', None],
    'column_3': [None, None, None, '1', None, '1', None]
})

df = spark.createDataFrame(pandas_df)
df.show()
# +--------+--------+--------+
# |column_1|column_2|column_3|
# +--------+--------+--------+
# |    null|    null|    null|
# |       1|    null|    null|
# |    null|       1|    null|
# |    null|    null|       1|
# |       1|       1|    null|
# |       1|       1|       1|
# |       1|    null|    null|
# +--------+--------+--------+


def non_null_to_column_name(name):
    return udf(lambda value: None if value is None else name, StringType())

atleast_two_udf = udf(lambda s: None if (s is None) or (',' not in s) else s, 
                      StringType())

cols = []
for name in df.columns:
    f = non_null_to_column_name(name)
    cols += [f(df[name])]

df = df.withColumn('collection', atleast_two_udf(concat_ws(',', *cols)))
df.show()
# +--------+--------+--------+--------------------+
# |column_1|column_2|column_3|          collection|
# +--------+--------+--------+--------------------+
# |    null|    null|    null|                null|
# |       1|    null|    null|                null|
# |    null|       1|    null|                null|
# |    null|    null|       1|                null|
# |       1|       1|    null|   column_1,column_2|
# |       1|       1|       1|column_1,column_2...|
# |       1|    null|    null|                null|
# +--------+--------+--------+--------------------+

推荐阅读