scala - 迭代 Spark Dataframe 运行缓慢
问题描述
我想验证现有列的数据并根据某些条件创建新列。
问题:我有大约 500 列和 9K 行 (9000) 的数据集。根据我的逻辑,如果其中一列具有任何空值,则针对该列创建新列,并将原始列的空值设置为 1,其余为 0。
但是下面的简单代码需要几个小时才能完成,尽管我的数据并不大。
dataset_.schema.fields.map(c => {
if(dataset_.filter(col(c.name).isNull).count() > 0)
{
dataset_ = dataset_.withColumn(c.name + "_isNull", when(col(c.name).isNull, 1).otherwise(0))
}
})
请帮助我优化我的代码或向我提供反馈以用不同的方法实现它。
注意:我在大集群(火花纱)上尝试过同样的事情。Google Dataproc 集群(3 个工作节点,机器类型 32 个 vCPU,280 GB 内存)
解决方案
同时计算所有计数:
val convert = df.select(
df.columns.map(c => (count(c) =!= count("*")).alias(c)): _*
).first.getValuesMap[Boolean](df.columns)
并使用结果添加列
convert.collect { case (c, true) => c }.foldLeft(df) {
(df, c) => df.withColumn(s"${c}_isNull", when(col(c).isNull, 1).otherwise(0))
}
推荐阅读
- next.js - 未找到 NextJS 图像模块
- selenium - 将测试对象转换为字符串
- powerbi - Power BI 数据源错误:数据源“未知”登录失败
- python - 如何设置 ThreeLineAvatarIconListItem 的宽度?
- php - 如何在php中将属性类作为变量?
- c - MPI 数组未声明
- python - 数组元素的布尔轮廓
- python - Wtforms - 如何管理 SQLite JSON 类型的呈现和创建动态表单
- cloud - Google Cloud PUB/SUB 每次调用都会返回不同数量的消息
- mongodb - 创建mongodb atlas集群后怎么办?