python - 基于列条件的 PySpark drop-dupes
问题描述
Spark 仍然是新手,我正在尝试尽可能干净有效地进行最终转换。
假设我有一个如下所示的数据框
+------+--------+
|ID | Hit |
+------+--------+
|123 | 0 |
|456 | 1 |
|789 | 0 |
|123 | 1 |
|123 | 0 |
|789 | 1 |
|1234 | 0 |
| 1234 | 0 |
+------+--------+
我试图以一个新的数据框结束(或两个,取决于更有效的方法),如果一行在“命中”中有一个 1,它不能有一个命中为 0 的行,如果有, 0 将根据 ID 列达到不同的级别。
这是我尝试过的方法之一,但我不确定这是否是 1. 可能最有效的方法 2. 可能最干净的方法
dfhits = df.filter(df.Hit == 1)
dfnonhits = df.filter(df.Hit == 0)
dfnonhitsdistinct = dfnonhits.filter(~dfnonhits['ID'].isin(dfhits))
Enddataset 如下所示:
+------+--------+
|ID | Hit |
+------+--------+
|456 | 1 |
|123 | 1 |
|789 | 1 |
|1234 | 0 |
+------+--------+
解决方案
# Creating the Dataframe.
from pyspark.sql.functions import col
df = sqlContext.createDataFrame([(123,0),(456,1),(789,0),(123,1),(123,0),(789,1),(500,0),(500,0)],
['ID','Hit'])
df.show()
+---+---+
| ID|Hit|
+---+---+
|123| 0|
|456| 1|
|789| 0|
|123| 1|
|123| 0|
|789| 1|
|500| 0|
|500| 0|
+---+---+
total
这个想法是找到Hit
per ID
,如果它大于 0,则意味着 中至少1
存在一个Hit
。因此,当此条件为 时true
,我们将删除所有rows
值为Hit
0 的内容。
# Registering the dataframe as a temporary view.
df.registerTempTable('table_view')
df=sqlContext.sql(
'select ID, Hit, sum(Hit) over (partition by ID) as sum_Hit from table_view'
)
df.show()
+---+---+-------+
| ID|Hit|sum_Hit|
+---+---+-------+
|789| 0| 1|
|789| 1| 1|
|500| 0| 0|
|500| 0| 0|
|123| 0| 1|
|123| 1| 1|
|123| 0| 1|
|456| 1| 1|
+---+---+-------+
df = df.filter(~((col('Hit')==0) & (col('sum_Hit')>0))).drop('sum_Hit').dropDuplicates()
df.show()
+---+---+
| ID|Hit|
+---+---+
|789| 1|
|500| 0|
|123| 1|
|456| 1|
+---+---+
推荐阅读
- python - 如何改进这个递归函数?
- tensorflow - CNN 模型的输出为 1.0 或 0.0
- javascript - 错误错误:找不到带有路径的控件:'responses -> response'(Angular 8)FormArray
- cassandra - Cassandra Where 子句中的 IN 查询
- antd - Ant deisgn 为数组字段设置字段值
- sql - 使用两种不同的条件获取 sum / count 的结果
- python - 为什么两种情况下的输出不同?
- office-js - 在 Excel Addin JS Addin 中使用本机 Excel 图标
- c++ - noexcept 运算符编译时检查
- c# - 动态添加对程序集的引用到 MSBuild