首页 > 解决方案 > 使用窗口操作替换所有列值?

问题描述

嗨数据框创建如下。

df = sc.parallelize([
    (1, 3),
    (2, 3),
    (3, 2),
    (4,2),
    (1, 3)
]).toDF(["id",'t']) 

它显示如下。

+---+---+
| id|  t|
+---+---+
|  1|  3|
|  2|  3|
|  3|  2|
|  4|  2|
|  1|  3|
+---+---+

我的主要目标是,我想用重复的次数替换每列中的重复值。

所以我尝试了流动代码它没有按预期工作。

from pyspark.sql.functions import col
column_list = ["id",'t']
w = Window.partitionBy(column_list)
dfmax=df.select(*((count(col(c)).over(w)).alias(c) for c in df.columns))
dfmax.show()
+---+---+
| id|  t|
+---+---+
|  2|  2|
|  2|  2|
|  1|  1|
|  1|  1|
|  1|  1|
+---+---+

我的预期输出将是

+---+---+
| id|  t|
+---+---+
|  2|  3|
|  1|  3|
|  1|  1|
|  1|  1|
|  2|  3|
+---+---+

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


如果我对您的理解正确,那么您正在寻找的只是:

df.select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns]).show()
#+---+---+
#| id|  t|
#+---+---+
#|  2|  3|
#|  2|  3|
#|  1|  2|
#|  1|  3|
#|  1|  2|
#+---+---+

这与您发布的内容之间的区别在于我们一次仅按一列分区。

请记住,DataFrame 是无序的。如果你想保持你的行顺序,你可以使用添加一个排序列pyspark.sql.functions.monotonically_increasing_id()

from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("order", monotonically_increasing_id())\
    .select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns])\
    .sort("order")\
    .drop("order")\
    .show()
#+---+---+
#| id|  t|
#+---+---+
#|  2|  3|
#|  1|  3|
#|  1|  2|
#|  1|  2|
#|  2|  3|
#+---+---+

推荐阅读