python - 对 pyspark 中的列进行重新分区如何影响分区数?
问题描述
我有一个包含一百万条记录的数据框。看起来像这样 -
df.show()
+--------------------+--------------------++-------------
| feature1| feature2| domain |
+--------------------+--------------------++-------------
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1 |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2 |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1 |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2 |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1 |
Spark 中的理想分区大小为 128 MB,假设域列有两个唯一值(域 1 和域 2),考虑到这一点,我有两个问题 -
如果我这样做
df.repartition("domain")
并且如果一个分区无法容纳特定域密钥的所有数据,应用程序会失败还是会根据数据自动创建合适的分区?假设在上面的数据中,基于域键已经发生了重新分区,所以会有两个分区(唯一的键是 domain1 和 domain2)。现在假设 domain1 和 domain2 重复了 1000000 次,我将基于域进行自加入。因此,对于每个域,我将获得大约 10^12 条记录。考虑到我们有两个分区,并且在连接过程中分区的数量没有变化,这两个新分区是否能够处理 1000000 条记录?
解决方案
答案取决于数据的大小。当一个分区不能保存属于一个分区值的所有数据(例如domain1
)时,将创建更多分区,最多创建spark.sql.shuffle.partitions
多个。如果您的数据太大,即一个分区会超过 2GB 的限制(另请参阅为什么 Spark RDD 分区对 HDFS 有 2GB 限制?),重新分区将导致OutOfMemoryError。
就像提供完整答案的旁注一样:能够将数据放入一个分区中并不一定意味着只为一个分区值生成一个分区。这取决于 - 其中包括 - 执行者的数量以及数据之前的分区方式。Spark 会尽量避免不必要的洗牌,因此可以为一个分区值生成多个分区。
因此,为了防止作业失败,您应该调整spark.sql.shuffle.partitions
或将所需的分区数repartition
与分区列一起传递。
推荐阅读
- javascript - Twitch API 和显示在线状态
- regex - java regex pattern.compile Vs 匹配器
- sql - 如何使用联接从两个表中获取不匹配的记录
- java - 超类中的私有变量如何在这里的子类中继承?
- asp.net-mvc - 整数值的数据注释验证
- angular - android 版本的 Ionic cordova 插件
- html - 如何将背景图像浮动到左侧?
- javascript - 如何在节点控制台中记录深度嵌套的对象
- javascript - 如何在ajax的授权和cors的第二个响应中显示内容?
- python - 'my_app' 模块未找到错误 Django - 执行使用 my_app 模型的脚本