首页 > 解决方案 > PySpark - 在分组后选择列具有非连续值的行

问题描述

我有一个形式的数据框:

|user_id| action | day |
------------------------
| d25as | AB     | 2   |
| d25as | AB     | 3   |
| d25as | AB     | 5   |
| m3562 | AB     | 1   |
| m3562 | AB     | 7   |
| m3562 | AB     | 9   |
| ha42a | AB     | 3   |
| ha42a | AB     | 4   |
| ha42a | AB     | 5   |

我想过滤掉连续几天看到的用户,如果他们至少在一个非连续的一天里没有看到的话。生成的数据框应为:

|user_id| action | day |
------------------------
| d25as | AB     | 2   |
| d25as | AB     | 3   |
| d25as | AB     | 5   |
| m3562 | AB     | 1   |
| m3562 | AB     | 7   |
| m3562 | AB     | 9   |

最后一个用户已被删除,因为他只是连续几天出现。有谁知道如何在火花中做到这一点?

标签: apache-sparkdataframegroup-bypyspark

解决方案


阅读中间的评论。代码将是不言自明的。

from pyspark.sql.functions import udf, collect_list, explode
#Creating the DataFrame
values = [('d25as','AB',2),('d25as','AB',3),('d25as','AB',5),
          ('m3562','AB',1),('m3562','AB',7),('m3562','AB',9),
          ('ha42a','AB',3),('ha42a','AB',4),('ha42a','AB',5)]
df = sqlContext.createDataFrame(values,['user_id','action','day'])
df.show() 
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
|  ha42a|    AB|  3|
|  ha42a|    AB|  4|
|  ha42a|    AB|  5|
+-------+------+---+

# Grouping together the days in one list.
df = df.groupby(['user_id','action']).agg(collect_list('day'))
df.show()
+-------+------+-----------------+
|user_id|action|collect_list(day)|
+-------+------+-----------------+
|  ha42a|    AB|        [3, 4, 5]|
|  m3562|    AB|        [1, 7, 9]|
|  d25as|    AB|        [2, 3, 5]|
+-------+------+-----------------+

# Creating a UDF to check if the days are consecutive or not. Only keep False ones.
check_consecutive = udf(lambda row: sorted(row) == list(range(min(row), max(row)+1)))
df = df.withColumn('consecutive',check_consecutive(col('collect_list(day)')))\
      .where(col('consecutive')==False)
df.show()
+-------+------+-----------------+-----------+
|user_id|action|collect_list(day)|consecutive|
+-------+------+-----------------+-----------+
|  m3562|    AB|        [1, 7, 9]|      false|
|  d25as|    AB|        [2, 3, 5]|      false|
+-------+------+-----------------+-----------+

# Finally, exploding the DataFrame from above to get the result.
df = df.withColumn("day", explode(col('collect_list(day)')))\
       .drop('consecutive','collect_list(day)')
df.show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
+-------+------+---+

推荐阅读