首页 > 解决方案 > pyspark 累积计数不同

问题描述

我想生成一个网站的唯一访问者的每日累积计数,而 pyspark countDistinct 本机函数在移动/增长窗口内不起作用

对于以下数据:

+---+----+
|day|user|
+---+----+
|  1|   A|
|  2|   B|
|  3|   A|
|  4|   C|
|  5|   C|
|  5|   B|
+---+----+

会期望结果:

+---+---------+
|day|cum_count|
+---+---------+
|  1|        1| -> [A]
|  2|        2| -> [A,B]
|  3|        2| -> [A,B]
|  4|        3| -> [A,B,C]
|  5|        3| -> [A,B,C]
+---+---------+

PS:原始数据庞大,无法转储到pandas

标签: pyspark

解决方案


我可以实现这个结果,将问题从不同的累积计数转换为累积总和。问题是只保留该期间的第一次用户访问。

import pyspark.sql.functions as F
from pyspark.sql.window import Window

#example dataset
>>> data = sqlContext.createDataFrame([[1,'A'],[2,'B'],[3,'A'],[4,'C'],[5,'C'],[5,'B']],schema=['day','user'])
>>> data.show()
+---+----+
|day|user|
+---+----+
|  1|   A|
|  2|   B|
|  3|   A|
|  4|   C|
|  5|   C|
|  5|   B|
+---+----+

#enumerate each user visit
>>> data = data.withColumn('user_visit',F.row_number().over(Window.partitionBy('user').orderBy('day')))
>>> data.orderBy('user','day').show()
+---+----+----------+
|day|user|user_visit|
+---+----+----------+
|  1|   A|         1|
|  3|   A|         2|
|  2|   B|         1|
|  5|   B|         2|
|  4|   C|         1|
|  5|   C|         2|
+---+----+----------+

#Keep just the first visit
>>> data = data.withColumn('first_visit',F.when(F.col('user_visit') == 1,1))
>>> data.orderBy('day','user').show()
+---+----+----------+-----------+
|day|user|user_visit|first_visit|
+---+----+----------+-----------+
|  1|   A|         1|          1|
|  2|   B|         1|          1|
|  3|   A|         2|       null|
|  4|   C|         1|          1|
|  5|   B|         2|       null|
|  5|   C|         2|       null|
+---+----+----------+-----------+

# cumulative sum of first visits
>>> w = Window.partitionBy().orderBy('day').rangeBetween(Window.unboundedPreceding,0)
>>> data = data.withColumn('cum_count',F.sum('first_visit').over(w))
>>> data.orderBy('day','user').show()
+---+----+----------+-----------+---------+
|day|user|user_visit|first_visit|cum_count|
+---+----+----------+-----------+---------+
|  1|   A|         1|          1|        1|
|  2|   B|         1|          1|        2|
|  3|   A|         2|       null|        2|
|  4|   C|         1|          1|        3|
|  5|   B|         2|       null|        3|
|  5|   C|         2|       null|        3|
+---+----+----------+-----------+---------+

#aditional step to get the day total without duplicates
>>> data.groupBy('day').agg(F.max('cum_count')).show()
+---+--------------+
|day|max(cum_count)|
+---+--------------+
|  1|             1|
|  2|             2|
|  3|             2|
|  4|             3|
|  5|             3|
+---+--------------+

推荐阅读