首页 > 解决方案 > PySpark DataFrame 中作为新列的行模式

问题描述

是否可以根据先前列的最大值添加新列,其中先前的列是字符串文字。考虑以下数据框:

df = spark.createDataFrame(
    [
        ('1',25000,"black","black","white"),
        ('2',16000,"red","black","white"),
    ],
    ['ID','cash','colour_body','colour_head','colour_foot']
)

那么目标框架应该是这样的:

df = spark.createDataFrame(
    [
        ('1',25000,"black","black","white", "black" ),
        ('2',16000,"red","black","white", "white" ),
    ],
    ['ID','cash','colour_body','colour_head','colour_foot', 'max_v']
)

如果没有可检测的最大值,则应使用最后一个有效颜色。

是否有某种可用的计数器可能性或 udf?

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


定义一个 UDFstatistics.mode来计算具有所需语义的逐行模式:

import statistics

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def mode(*x):
    try:
        return statistics.mode(x)
    except statistics.StatisticsError:
        return x[-1]

mode = udf(mode, StringType())

df.withColumn("max_v", mode(*[col(c) for c in df.columns if 'colour' in c])).show()

+---+-----+-----------+-----------+-----------+-----+
| ID| cash|colour_body|colour_head|colour_foot|max_v|
+---+-----+-----------+-----------+-----------+-----+
|  1|25000|      black|      black|      white|black|
|  2|16000|        red|      black|      white|white|
+---+-----+-----------+-----------+-----------+-----+

推荐阅读