apache-spark - PySpark - 在分组后选择列具有非连续值的行
问题描述
我有一个形式的数据框:
|user_id| action | day |
------------------------
| d25as | AB | 2 |
| d25as | AB | 3 |
| d25as | AB | 5 |
| m3562 | AB | 1 |
| m3562 | AB | 7 |
| m3562 | AB | 9 |
| ha42a | AB | 3 |
| ha42a | AB | 4 |
| ha42a | AB | 5 |
我想过滤掉连续几天看到的用户,如果他们至少在一个非连续的一天里没有看到的话。生成的数据框应为:
|user_id| action | day |
------------------------
| d25as | AB | 2 |
| d25as | AB | 3 |
| d25as | AB | 5 |
| m3562 | AB | 1 |
| m3562 | AB | 7 |
| m3562 | AB | 9 |
最后一个用户已被删除,因为他只是连续几天出现。有谁知道如何在火花中做到这一点?
解决方案
阅读中间的评论。代码将是不言自明的。
from pyspark.sql.functions import udf, collect_list, explode
#Creating the DataFrame
values = [('d25as','AB',2),('d25as','AB',3),('d25as','AB',5),
('m3562','AB',1),('m3562','AB',7),('m3562','AB',9),
('ha42a','AB',3),('ha42a','AB',4),('ha42a','AB',5)]
df = sqlContext.createDataFrame(values,['user_id','action','day'])
df.show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| d25as| AB| 2|
| d25as| AB| 3|
| d25as| AB| 5|
| m3562| AB| 1|
| m3562| AB| 7|
| m3562| AB| 9|
| ha42a| AB| 3|
| ha42a| AB| 4|
| ha42a| AB| 5|
+-------+------+---+
# Grouping together the days in one list.
df = df.groupby(['user_id','action']).agg(collect_list('day'))
df.show()
+-------+------+-----------------+
|user_id|action|collect_list(day)|
+-------+------+-----------------+
| ha42a| AB| [3, 4, 5]|
| m3562| AB| [1, 7, 9]|
| d25as| AB| [2, 3, 5]|
+-------+------+-----------------+
# Creating a UDF to check if the days are consecutive or not. Only keep False ones.
check_consecutive = udf(lambda row: sorted(row) == list(range(min(row), max(row)+1)))
df = df.withColumn('consecutive',check_consecutive(col('collect_list(day)')))\
.where(col('consecutive')==False)
df.show()
+-------+------+-----------------+-----------+
|user_id|action|collect_list(day)|consecutive|
+-------+------+-----------------+-----------+
| m3562| AB| [1, 7, 9]| false|
| d25as| AB| [2, 3, 5]| false|
+-------+------+-----------------+-----------+
# Finally, exploding the DataFrame from above to get the result.
df = df.withColumn("day", explode(col('collect_list(day)')))\
.drop('consecutive','collect_list(day)')
df.show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| m3562| AB| 1|
| m3562| AB| 7|
| m3562| AB| 9|
| d25as| AB| 2|
| d25as| AB| 3|
| d25as| AB| 5|
+-------+------+---+
推荐阅读
- google-apps-script - How do I paste format with google sheets script?
- matlab - Is Simulink build something like .exe from our model?
- android - How to add ssl certificate in couchbase android programmatically
- powershell - 使用 Invoke-Command 在远程机器上保存 XML
- swift - 如果观察者很忙,如何删除新元素?
- c# - Disable Tenant and logout all users Aspnetboilerplate
- wso2 - How to use private NTP service (instedad pool.ntp.org) in the syncronization of WSO2 MB cluster nodes?
- java - Docker swarm:org.apache.kafka.common.errors.TimeoutException:等待节点分配超时
- javascript - 使用 flatpickr 使用变量设置 minDate
- outlook - Win7 和 Win10 操作系统机器上 Outlook 进程的虚拟内存利用率