首页 > 解决方案 > Spark - 将函数应用于没有 UDF 的数据框的列

问题描述

我知道如何使用 UDF 将函数应用于数据框的列。但是,我要创建的函数访问外部服务。基本上,它通过 REST API 提交文本,然后添加一个新的响应列。不建议在访问外部服务时使用 UDF。事实上,当我尝试使用 UDF 执行此操作时,我遇到了零星的错误。

实现这一目标的最佳实践是什么?我不认为我的问题是特定于代码的,但我将在下面添加一些作为示例:

val entityDf = df.select(col("Text"),
                            col("coordinates"),
                            col("LocX"),
                            col("LocY"))
                            .withColumn("TextClass", functionUdf(col("text")))

我应该在此处添加尝试将此函数用作 UDF 时遇到的错误,以防万一我正在寻找错误问题的解决方案:

线程“主”org.apache.spark.SparkException 中的异常:无法执行用户定义的函数($anonfun$5: (string) => string)

def testFunc(text: String): String ={
    val gson = new Gson()

    val result = Http("url")
      .postData(f"""postdata""")
      .header("Content-Type", "application/json")
      .header("Charset", "UTF-8")
      .option(HttpOptions.readTimeout(10000))
      .asString

    val rootJson = gson.fromJson(result.body, classOf[rootNerJson])

    if(rootJson.classes.length > 0){
      return rootJson.classes(0).label
    }

    return "Null"
  }

val functionUdf = udf[String, String](testFunc)

编辑:堆栈跟踪:

线程“主”org.apache.spark.SparkException 中的异常:无法在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF) 处执行用户定义的函数($anonfun$5: (string) => string) .scala:1075) 在 org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144) 在 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48 ) 在 scala.collection 的 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30).TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection。 TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $$anonfun$apply$22.applyOrElse(Optimizer.scala:1147) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$22.applyOrElse(Optimizer.scala:1142) at org.apache.spark .sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org .apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) 在 org.apache。 spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:27​​2)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql。 org.apache.spark.sql.catalyst.trees.TreeNode 上的催化剂.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)。mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode. scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 上 org.apache.spark.sql.catalyst.trees.TreeNode 上的 trees.TreeNode.mapProductIterator(TreeNode.scala:187)。 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1 处的 transformDown(TreeNode.scala:27​​2)。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:27​​2)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql。 org.apache.spark.sql.catalyst.trees.TreeNode 上的催化剂.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)。mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode. scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 上 org.apache.spark.sql.catalyst.trees.TreeNode 上的 trees.TreeNode.mapProductIterator(TreeNode.scala:187)。 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1 处的 transformDown(TreeNode.scala:27​​2)。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:27​​2) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:27​​2)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:27​​2) at org.apache.spark.sql.catalyst.trees.TreeNode.transform( TreeNode.scala:256) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1142) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer. scala:1141) 在 org.apache.spark.sql.catalyst.rules。RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply (RuleExecutor.scala:82) 在 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) 在 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) 在 scala.collection.mutable.WrappedArray。 foldLeft(WrappedArray.scala:35) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules。 RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute( RuleExecutor.scala:74) 在 org.apache.spark.sql。execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala: 89) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) 在 org.apache.spark.sql 的 org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) .Dataset.head(Dataset.scala:2153) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:2366) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:245) 在org.apache.spark.sql.Dataset。show(Dataset.scala:644) at org.apache.spark.sql.Dataset.show(Dataset.scala:603) at org.apache.spark.sql.Dataset.show(Dataset.scala:612) at OCRJson$$ anonfun$main$1.apply(OCRJson.scala:112) at OCRJson$$anonfun$main$1.apply(OCRJson.scala:24) at scala.collection.immutable.List.foreach(List.scala:392) at OCRJson$ .main(OCRJson.scala:24) at OCRJson.main(OCRJson.scala) 原因:com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: 预期 BEGIN_OBJECT 但在第 1 行第 1 列路径 $ 处为字符串。 google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:226) 在 com.google.gson.Gson.fromJson(Gson.java:922) 在 com.google.gson.Gson.fromJson(Gson. java:887) 在 com.google.gson.Gson.fromJson(Gson.java:836) 在 com.google.gson。Gson.fromJson(Gson.java:808) 在 OCRJson$$anonfun$main$1.OCRJson$$anonfun$$ner$1(OCRJson.scala:69) 在 OCRJson$$anonfun$main$1$$anonfun$5.apply(OCRJson .scala:78) 在 OCRJson$$anonfun$main$1$$anonfun$5.apply(OCRJson.scala:78) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala :92) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala :1072) ... 87 更多原因:java.lang.IllegalStateException: 预期 BEGIN_OBJECT 但在 com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385) 在 com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385) 的第 1 行第 1 列路径为字符串.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:215) ... 97 更多还有 97 个

标签: scalaapache-spark

解决方案


推荐阅读