首页 > 解决方案 > Google Cloud Dataproc 上的 Pyspark 作业失败

问题描述

我创建了一个包含 1 个主节点和 10 个节点的 Dataproc 集群。它们都具有相同的 CPU 和内存配置:32 个 vCPU,120 GB 内存。当我提交一份处理大量数据和计算的工作时。作业失败。

从日志记录来看,我不确定是什么导致了失败。但是我看到了来自 tJob#: job-c46fc848-6: Container killed by YARN for超出内存限制的内存相关错误消息。使用了 24.1 GB 的 24 GB 物理内存。考虑提升 spark.yarn.executor.memoryOverhead。

所以我尝试了一些从其他帖子中找到的解决方案。例如,当从“作业”控制台提交作业时,我尝试在“属性”部分增加 spark.executor.memoryOverhead 和 spark.driver.maxResultSize。作业# find-duplicate-job-c46fc848-7 仍然失败。

我还看到了警告消息,但不确定它的含义:18/06/04 17:13:25 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for rdd_43_155 !

我将尝试创建一个更高级别的集群,看看它是否有效。但我怀疑它是否能解决问题,因为具有 1 个主节点和 10 个节点、32 个 vCPU、120 GB 内存的集群已经非常强大。

希望能得到高级用户和专家的一些帮助。提前致谢!

标签: apache-sparkpysparkgoogle-cloud-dataproc

解决方案


失败的根本原因与自交叉连接引起的内存有关。即使我不断增加 CPU 功率和内存,它仍然失败。所以这个解决方案是以下的组合。

  1. 使用 repartition() 函数在连接之后、下一次转换之前重新分区。这将解决数据倾斜问题。例如:df_joined = df_joined.repartition(partitions)
  2. 广播右表。
  3. 将其分解为 10 次迭代。在每次迭代中,我只处理左表的 1/10 与右表的完整数据连接。

请参阅示例代码:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

结合以上 3 种方法,我能够在 1.5 小时内完成这项工作,并且只使用了 1 个主节点和 4 个工作节点(每个 vm 8 个 CPU 和 30 GB)。


推荐阅读