首页 > 解决方案 > 在 PySpark 中将字符串列标记和排序为多列

问题描述

我有一个 PySpark 数据框,它有一个字符串列,其中包含一个逗号分隔的未排序值列表(最多 5 个值),如下所示:

+----+----------------------+
|col1|col2                  |
+----+----------------------+
|1   | 'b1, a1, c1'         |
|2   | 'a2, b2'             |
|3   | 'e3, d3, a3, c3, b3' |
+----+----------------------+

我想标记化col2,然后根据标准对它们进行排名,并创建 5 个新的不同列col2,如果标记化返回少于 5 个值,则可能具有空值。排序很简单:如果token在set1中,则将其放入第一个新列(col3),否则如果在set2中,则将其放入第二个新列(col4),依此类推。

比方说:

set1 = ['a1', 'a2', 'a3', 'a4', 'a5'], 
set2 = ['b1', 'b2', 'b3', 'b4', 'b5'], 
set3 = ['c1', 'c2', 'c3', 'c4', 'c5'], 
set4 = ['d1', 'd2', 'd3', 'd4', 'd5'], 
set5 = ['e1', 'e2', 'e3', 'e4', 'e5']

然后对上面的数据框应用更改将产生以下数据框:

+----+----+----+----+----+----+
|col1|col3|col4|col5|col6|col7|
+----+----+----+----+----+----+
|1   |'a1'|'b1'|'c1'|null|null|
|2   |'a2'|'b2'|null|null|null|
|3   |'a3'|'b3'|'c3'|'d3'|'e3'|
+----+----+----+----+----+----+

我知道如何进行标记化:

df.withColumn('col2', split('col2', ', ')) \
  .select(col('col1'), *[col('col2')[i].alias('col' + str(i + 3)) for i in range(0, 5)]) \
  .show()

但在创建新列之前无法弄清楚如何进行排名。任何帮助将非常感激。

标签: pythondataframepyspark

解决方案


我找到了解决方案。我们可以使用 udf 根据集合对该列中的字符串列表进行排序。然后在 udf 函数之上应用标记化并从中创建不同的列。

set1 = set(['a1', 'a2', 'a3', 'a4', 'a5'])
set2 = set(['b1', 'b2', 'b3', 'b4', 'b5'])
set3 = set(['c1', 'c2', 'c3', 'c4', 'c5'])
set4 = set(['d1', 'd2', 'd3', 'd4', 'd5'])
set5 = set(['e1', 'e2', 'e3', 'e4', 'e5'])

def sortCategories(x):
    resultArray = ['unknown' for i in range(5)]
    tokens = x.split(',')
    for token in tokens:
        if token in set1:
            resultArray[0] = token
        elif token in set2:
            resultArray[1] = token
        elif token in set3:
            resultArray[2] = token
        elif token in set4:
            resultArray[3] = token
        elif token in set5:
            resultArray[4] = token
    return ','.join(resultArray)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
orderUdfString = udf(lambda s: sortCategories(s), StringType())
df = df.withColumn('col2', orderUdfString('col2'))
df = df.withColumn('col_temp', split('col2', ',')) \
  .select([col(c) for c in df.columns] + [col('col_temp')[i].alias('col' + str(i + 1)) for i in range(0, 5)])

推荐阅读