scala - 数据帧的 Spark 联合不计算?
问题描述
我正在尝试合并这些数据帧,我使用 G_ID 不是 Null 或 MCOM.T_ID 不是 null 并使用修剪,计数没有出现,它从 1 小时开始运行。300 个任务中只剩下 3 个任务。请建议我该如何调试?是 null 导致问题我该如何调试?
val table1 = spark.sql(""" SELECT trim(C_ID) AS PC_ID FROM ab.CIDS WHERE
_UPDT_TM >= '2020-02-01 15:14:39.527' """)
val table2 = spark.sql(""" SELECT trim(C_ID) AS PC_ID FROM ab.MIDS MCOM INNER
JOIN ab.VD_MBR VDBR
ON Trim(MCOM.T_ID) = Trim(VDBR.T_ID) AND Trim(MCOM.G_ID) = Trim(VDBR.G_ID)
AND Trim(MCOM.C123M_CD) IN ('BBB', 'AAA') WHERE MCOM._UPDT_TM >= '2020-02-01 15:14:39.527'
AND Trim(VDBR.BB_CD) IN ('BBC') """)
var abc=table1.select("PC_ID").union(table2.select("PC_ID"))
even tried this --> filtered = abc.filter(row => !row.anyNull);
解决方案
看起来您有数据倾斜问题。查看“摘要指标”,很明显(至少)四分之三的分区是空的,因此您正在消除 spark 可以为您提供的大部分潜在并行化。
虽然它会导致一个 shuffle 步骤(其中数据在不同的执行者之间通过网络移动),但 a.repartition()
将有助于平衡所有分区中的数据并创建更多有效的工作单元以在可用内核之间传播。这很可能会加快您的count()
.
根据经验,您可能希望在调用.repartition()
时将参数设置为至少集群中的核心数。将其设置得更高将导致任务更快地完成(观看进度很有趣),但会增加一些管理开销到作业运行所需的总时间。如果任务太小(即每个分区没有足够的数据),那么有时调度程序会感到困惑并且也不会使用整个集群。总体而言,找到正确数量的分区是一种平衡行为。
推荐阅读
- python - 执行 Selenium Python 自动化脚本时出错
- sql - 转换函数 SQL Server
- three.js - 场景中两个对象之间的三个 js RayCast
- javascript - 下面的代码是异步的吗?
- ios - 在 Xcode 10 中设置状态栏样式
- java - 如何从属性名称中给出的json中读取属性值
- caching - 如何在 .net-core-2.1 中使用内存缓存避免缓存未命中
- mysql - 使用 mySql 的简单 React 无法读取表记录
- c# - 在多个构造函数参数中注入具有相同接口的不同实现
- powershell - 在 PowerShell 中从 CNAME 解析主机名