首页 > 解决方案 > 迭代 Spark Dataframe 运行缓慢

问题描述

我想验证现有列的数据并根据某些条件创建新列。

问题:我有大约 500 列和 9K 行 (9000) 的数据集。根据我的逻辑,如果其中一列具有任何空值,则针对该列创建新列,并将原始列的空值设置为 1,其余为 0。

但是下面的简单代码需要几个小时才能完成,尽管我的数据并不大。

dataset_.schema.fields.map(c => {
  if(dataset_.filter(col(c.name).isNull).count() > 0)
  {
    dataset_ = dataset_.withColumn(c.name + "_isNull", when(col(c.name).isNull, 1).otherwise(0))
  }
})

请帮助我优化我的代码或向我提供反馈以用不同的方法实现它。

注意:我在大集群(火花纱)上尝试过同样的事情。Google Dataproc 集群(3 个工作节点,机器类型 32 个 vCPU,280 GB 内存)

标签: scalaapache-sparkspark-dataframeapache-spark-mllibgoogle-cloud-dataproc

解决方案


同时计算所有计数:

val convert = df.select(
  df.columns.map(c => (count(c) =!= count("*")).alias(c)): _*
).first.getValuesMap[Boolean](df.columns)

并使用结果添加列

convert.collect { case (c, true) => c }.foldLeft(df) {
  (df, c) => df.withColumn(s"${c}_isNull", when(col(c).isNull, 1).otherwise(0))
}

推荐阅读