首页 > 解决方案 > `combineByKey`,pyspark

问题描述

只是想知道这到底是做什么的?我明白keyBy,但我很难理解那到底是什么combineByKey。我已经阅读了页面(链接),但仍然不明白。

df.rdd.keyBy(
        lambda row: row['id']
    ).combineByKey(
        lambda row: [row],
        lambda rows, row: rows + [row],
        lambda rows1, rows2: rows1 + rows2,
    )
)

标签: pythonapache-sparkpyspark

解决方案


简而言之,combineByKey 允许您明确指定聚合(或减少)rdd 的 3 个阶段。

1.当第一次遇到单行时,它做了什么?

在您提供的示例中,该行被放入列表中。

2.当单行遇到之前减少的行时,它会做什么?

在该示例中,先前减少的行已经是一个列表,我们将新行添加到它并返回新的扩展列表。

3. 对先前减少的两个行做了什么?

在上面的示例中,两行都已经是列表,我们返回一个新列表,其中包含来自这两行的项目。

这些链接中提供了更多、解释清楚的分步示例:

http://hadoopexam.com/adi/index.php/spark-blog/90-how-combinebykey-works-in-spark-step-by-step-explaination

http://etlcode.com/index.php/blog/info/Bigdata/Apache-Spark-Difference-between-reduceByKey-groupByKey-and-combineByKey

第二个链接的一个关键解释是:

在此处输入图像描述

让我们看看 combineByKey 在我们的用例中是如何工作的。当 combineByKey 浏览每个元素时,即对于分区 1 - (Messi,45),它有一个以前没有见过的键,当它移动到下一个 (Messi,48) 时,它会得到一个以前见过的键。当它第一次看到一个元素时, combineByKey() 使用名为 createCombiner 的函数为该键上的累加器创建一个初始值。即它使用梅西作为键和45作为值。因此,该键(梅西)的累加器的当前值变为 45。现在,下次 combineByKey() 在同一分区上看到相同的键时,它不使用 createCombiner,而是使用带有累加器当前值的第二个函数 mergeValue(45)和新值 48。

由于所有这些都在不同的分区中并行发生,因此相同的键可能存在于具有其他累加器集的其他分区上。因此,当必须合并来自不同分区的结果时,它使用 mergeCombiners 函数。


推荐阅读