首页 > 解决方案 > pySpark forEachPartition - 代码在哪里执行

问题描述

我在 2.3 版中使用 pySpark(在我当前的开发系统中无法更新到 2.4),并且对foreachPartition有以下问题。

首先是一点背景:据我所知,pySparkUDFs强制 Python 代码在 Python 实例中的 Java 虚拟机 (JVM) 之外执行,这会导致性能消耗。由于我需要将一些 Python 函数应用于我的数据并希望最大限度地减少开销成本,我的想法是至少将一组可处理的数据加载到驱动程序中并将其作为 Pandas-DataFrame 处理。无论如何,这将导致 Spark 失去并行性优势。然后我读到foreachPartition将函数应用于分区内的所有数据,因此允许并行处理。

我现在的问题是:

  1. 当我通过 应用 Python 函数时foreachPartition,Python 执行是否发生在驱动程序进程中(因此分区数据通过网络传输到我的驱动程序)?

  2. 是在内部按行处理数据foreachPartition(意味着每个 RDD 行都被一一传输到 Python 实例),还是分区数据一次处理(意味着,例如,整个分区被传输到实例和由一个 Python 实例整体处理)?

预先感谢您的意见!


编辑:

我之前使用的驱动程序解决方案看起来像这样,取自这里

for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
    # Do stuff on the partition

文档 rdd.toLocalIterator()中可以看出,它提供了必要的功能:

返回包含此 RDD 中所有元素的迭代器。迭代器将消耗与此 RDD中最大分区一样多的内存。

标签: pythonpandasapache-sparkpyspark

解决方案


幸运的是,我偶然发现了 Mrinal 的这个很好的解释(在此处mapPartitions回答)。

mapPartitions在 RDD 的每个分区上应用一个函数。因此,如果分区分布在不同的节点上,则可以使用并行化。在这些节点上创建了处理 Python 函数所必需的相应 Python 实例。虽然foreachPartition只应用一个函数(例如将您的数据写入 .csv 文件),但mapPartitions也会返回一个新的 RDD。因此,使用foreachPartition对我来说是错误的选择。

为了回答我的第二个问题:函数喜欢mapUDFs创建一个新的 Python 实例并从 DataFrame/RDD 中逐行传递数据,从而导致大量开销。foreachPartitionmapPartitions(两个 RDD 函数)将整个分区转移到 Python 实例。

此外,使用生成器还减少了对传输的分区数据进行迭代所需的内存量(分区作为迭代器对象处理,而每一行都通过迭代该对象来处理)。

一个示例可能如下所示:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

希望这可以帮助面临类似问题的人:)


推荐阅读