首页 > 解决方案 > 数据帧的 Spark 联合不计算?

问题描述

我正在尝试合并这些数据帧,我使用 G_ID 不是 Null 或 MCOM.T_ID 不是 null 并使用修剪,计数没有出现,它从 1 小时开始运行。300 个任务中只剩下 3 个任务。请建议我该如何调试?是 null 导致问题我该如何调试?

在此处输入图像描述

在此处输入图像描述

 val table1 = spark.sql(""" SELECT  trim(C_ID) AS PC_ID FROM ab.CIDS WHERE 
  _UPDT_TM >= '2020-02-01 15:14:39.527'  """)

val table2 = spark.sql(""" SELECT trim(C_ID) AS PC_ID   FROM ab.MIDS MCOM INNER
 JOIN ab.VD_MBR VDBR
  ON Trim(MCOM.T_ID) = Trim(VDBR.T_ID) AND Trim(MCOM.G_ID) = Trim(VDBR.G_ID)
 AND Trim(MCOM.C123M_CD) IN ('BBB', 'AAA') WHERE MCOM._UPDT_TM >= '2020-02-01 15:14:39.527'
 AND Trim(VDBR.BB_CD) IN ('BBC') """)

var abc=table1.select("PC_ID").union(table2.select("PC_ID"))

even tried this --> filtered = abc.filter(row => !row.anyNull);

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


看起来您有数据倾斜问题。查看“摘要指标”,很明显(至少)四分之三的分区是空的,因此您正在消除 spark 可以为您提供的大部分潜在并行化。

虽然它会导致一个 shuffle 步骤(其中数据在不同的执行者之间通过网络移动),但 a.repartition()将有助于平衡所有分区中的数据并创建更多有效的工作单元以在可用内核之间传播。这很可能会加快您的count().

根据经验,您可能希望在调用.repartition()时将参数设置为至少集群中的核心数。将其设置得更高将导致任务更快地完成(观看进度很有趣),但会增加一些管理开销到作业运行所需的总时间。如果任务太小(即每个分区没有足够的数据),那么有时调度程序会感到困惑并且也不会使用整个集群。总体而言,找到正确数量的分区是一种平衡行为。


推荐阅读