首页 > 解决方案 > 使用 countDistinct 倾斜数据

问题描述

我有一个包含 3 列的 PySpark DataFrame:“客户”、“产品”、“日期”。我想运行 groupBy 操作:

df.groupBy("product", "date").agg(F.countDistinct("client"))

所以我想计算每天购买产品的客户数量。这会导致巨大的倾斜数据(实际上,它会因为内存而导致错误)。我一直在学习腌制技术。据我了解,它可以与“sum”或“count”一起使用,向 groupBy 添加一个新列并执行第二次聚合,但由于countDistinct聚合方法,我不知道在这种情况下如何应用它们。

在这种情况下我该如何应用它?

标签: pysparkskew

解决方案


我建议不要countDistinct在这里使用并连续使用 2 个聚合来实现您想要的效果,特别是因为您的数据存在偏差。它可能如下所示:

import pyspark.sql.functions as F
new_df = (df
  .groupBy("product", "date", "client")
  .agg({}) # getting unique ("product", "date", "client") tuples
  .groupBy("product", "date")
  .agg(F.count('*').alias('clients'))
)

这里的第一个聚合确保你有一个 DataFrame,每个不同的(“product”,“date”,“client”)元组都有一行,第二个是计算每个(“product”,“date”)对的客户端数量。这样您就不必再担心偏差了,因为 Spark 会知道为您进行部分聚合(而不是countDistinct强制发送与每个 ("product", "date") 对相对应的所有单独的 "client" 值)到一个节点)。


推荐阅读