首页 > 解决方案 > 在生产中对 Spark Row 执行 .getAs 操作时出现异常。在本地运行良好

问题描述

我有一个给定一组键的通用代码,一个数据帧会在数据帧中找到该键集的重复项不起作用
的代码:

case class DuplicateRecord(
    datasetName: String,
    duplicateKeys: String,
    duplicateCount: Long
  )

def findDuplicatesInDF(
    spark: SparkSession
    inputName: String,
    inputDataFrame: DataFrame,
    groupColumns: Seq[String]): Dataset[DuplicateRecord] = {

    import spark.implicits._

    val keys = groupColumns.map(x => col(x))
    val idToCounts = inputDataFrame
      .groupBy(keys: _*)
      .agg(count(keys(0)).as("duplicateKeyCount"))

    idToCounts
      .filter(col("duplicateKeyCount") > 1)
      .map { idToCount =>
        DuplicateRecord(
          inputName,
          groupColumns.map(x => idToCount.getAs(x).toString).mkString(","),
          idToCount.getAs("duplicateKeyCount").toString.toLong)
      }
  }

上面的代码在本地运行良好。但是,它在生产中失败了

 Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:375)

有效的代码:

case class DuplicateRecord(
    datasetName: String,
    duplicateKeys: String,
    duplicateCount: Long
  )

case class IdToCounts(
    mergedKey: String,
    duplicateKeyCount: Long
  )

def findDuplicatesInDF(
    spark: SparkSession,
    inputName: String,
    inputDataFrame: DataFrame,
    groupColumns: Seq[String]): Dataset[DuplicateRecord] = {

    import spark.implicits._

    val keys = groupColumns.map(x => col(x))
    val idToCounts = inputDataFrame
      .withColumn("mergedKey", concat_ws(",", keys: _*))
      .groupBy(col("mergedKey"))
      .agg(count(col("mergedKey")).as("duplicateKeyCount"))
      .as[IdToCounts]

    idToCounts
      .filter(idToCount => idToCount.duplicateKeyCount > 1)
      .map { idToCount =>
        DuplicateRecord(inputName, idToCount.mergedKey, idToCount.duplicateKeyCount)
      }
  }

我知道这与 Spark 在本地模式下的单个 JVM 实例上运行这一事实有关。但是由于有多个执行程序,并且数据在 prod 中被分区,这导致了一种不确定的行为,即 spark 无法理解从哪里提取数据以完成操作。但是,我想了解确切的问题,并且在与此相关的现有堆栈溢出问题中没有得到令人信服的答案。对此的任何见解都将非常有帮助!谢谢!

标签: scalaapache-sparkapache-spark-sql

解决方案


推荐阅读