apache-spark - 使用窗口操作替换所有列值?
问题描述
嗨数据框创建如下。
df = sc.parallelize([
(1, 3),
(2, 3),
(3, 2),
(4,2),
(1, 3)
]).toDF(["id",'t'])
它显示如下。
+---+---+
| id| t|
+---+---+
| 1| 3|
| 2| 3|
| 3| 2|
| 4| 2|
| 1| 3|
+---+---+
我的主要目标是,我想用重复的次数替换每列中的重复值。
所以我尝试了流动代码它没有按预期工作。
from pyspark.sql.functions import col
column_list = ["id",'t']
w = Window.partitionBy(column_list)
dfmax=df.select(*((count(col(c)).over(w)).alias(c) for c in df.columns))
dfmax.show()
+---+---+
| id| t|
+---+---+
| 2| 2|
| 2| 2|
| 1| 1|
| 1| 1|
| 1| 1|
+---+---+
我的预期输出将是
+---+---+
| id| t|
+---+---+
| 2| 3|
| 1| 3|
| 1| 1|
| 1| 1|
| 2| 3|
+---+---+
解决方案
如果我对您的理解正确,那么您正在寻找的只是:
df.select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns]).show()
#+---+---+
#| id| t|
#+---+---+
#| 2| 3|
#| 2| 3|
#| 1| 2|
#| 1| 3|
#| 1| 2|
#+---+---+
这与您发布的内容之间的区别在于我们一次仅按一列分区。
请记住,DataFrame 是无序的。如果你想保持你的行顺序,你可以使用添加一个排序列pyspark.sql.functions.monotonically_increasing_id()
:
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("order", monotonically_increasing_id())\
.select(*[count(c).over(Window.partitionBy(c)).alias(c) for c in df.columns])\
.sort("order")\
.drop("order")\
.show()
#+---+---+
#| id| t|
#+---+---+
#| 2| 3|
#| 1| 3|
#| 1| 2|
#| 1| 2|
#| 2| 3|
#+---+---+
推荐阅读
- regex - 正则表达式以两种不同的方案捕获版本
- css - 中心 WordPress 元数据类别和时间元素 (Divs)
- python - 如何调用需要 CLI 参数的 python 方法?
- java - Matlab - 如何编译 JAR 文件
- vba - 过滤并循环遍历最小日期的记录列表
- remote-access - 使用远程桌面时未执行编码的 UI 测试脚本
- oracle - OBIEE 12c 管理工具 - 重复定义 - 元数据脚本执行错误
- javafx - 如何在 JavaFX 中创建一个只有原始外观关闭按钮的窗口?
- arrays - Swift 2 Object Mapper 类追加不起作用,结果为零
- r - 我可以拼凑一个我想要选择的列的名称吗?