python - pySpark forEachPartition - 代码在哪里执行
问题描述
我在 2.3 版中使用 pySpark(在我当前的开发系统中无法更新到 2.4),并且对foreachPartition有以下问题。
首先是一点背景:据我所知,pySparkUDFs
强制 Python 代码在 Python 实例中的 Java 虚拟机 (JVM) 之外执行,这会导致性能消耗。由于我需要将一些 Python 函数应用于我的数据并希望最大限度地减少开销成本,我的想法是至少将一组可处理的数据加载到驱动程序中并将其作为 Pandas-DataFrame 处理。无论如何,这将导致 Spark 失去并行性优势。然后我读到foreachPartition
将函数应用于分区内的所有数据,因此允许并行处理。
我现在的问题是:
当我通过 应用 Python 函数时
foreachPartition
,Python 执行是否发生在驱动程序进程中(因此分区数据通过网络传输到我的驱动程序)?是在内部按行处理数据
foreachPartition
(意味着每个 RDD 行都被一一传输到 Python 实例),还是分区数据一次处理(意味着,例如,整个分区被传输到实例和由一个 Python 实例整体处理)?
预先感谢您的意见!
编辑:
我之前使用的驱动程序解决方案看起来像这样,取自这里:
for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
# Do stuff on the partition
从文档 rdd.toLocalIterator()
中可以看出,它提供了必要的功能:
返回包含此 RDD 中所有元素的迭代器。迭代器将消耗与此 RDD中最大分区一样多的内存。
解决方案
幸运的是,我偶然发现了 Mrinal 的这个很好的解释(在此处mapPartitions
回答)。
mapPartitions
在 RDD 的每个分区上应用一个函数。因此,如果分区分布在不同的节点上,则可以使用并行化。在这些节点上创建了处理 Python 函数所必需的相应 Python 实例。虽然foreachPartition
只应用一个函数(例如将您的数据写入 .csv 文件),但mapPartitions
也会返回一个新的 RDD。因此,使用foreachPartition
对我来说是错误的选择。
为了回答我的第二个问题:函数喜欢map
或UDFs
创建一个新的 Python 实例并从 DataFrame/RDD 中逐行传递数据,从而导致大量开销。foreachPartition
和mapPartitions
(两个 RDD 函数)将整个分区转移到 Python 实例。
此外,使用生成器还减少了对传输的分区数据进行迭代所需的内存量(分区作为迭代器对象处理,而每一行都通过迭代该对象来处理)。
一个示例可能如下所示:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
希望这可以帮助面临类似问题的人:)
推荐阅读
- css - CSS 边框半径简写
- cassandra - upgradesstables 在 dse cassandra 升级中不起作用
- ibm-cloud - 如何将不同的 Watson Discovery 绑定到每个 CF 应用程序
- pdf - 我无法搜索 pdf 文件
- c++ - char* 到自定义类型
- html - 如何将 calc() 和 var() 用于具有动态值的 CSS 按钮?
- kotlin - micronaut 不允许从同一基类(实现接口)派生的两个控制器向路由器注册
- javascript - 使用 javascript/jquery 触发 onchange 事件时更新 DOM 中的哈希值
- html - 如何将内容保留在容器中,但在 CSS 中扩展其背景全宽?
- c++ - graphics.h 没有打开 BGI 可执行文件