scala - Spark - 将函数应用于没有 UDF 的数据框的列
问题描述
我知道如何使用 UDF 将函数应用于数据框的列。但是,我要创建的函数访问外部服务。基本上,它通过 REST API 提交文本,然后添加一个新的响应列。不建议在访问外部服务时使用 UDF。事实上,当我尝试使用 UDF 执行此操作时,我遇到了零星的错误。
实现这一目标的最佳实践是什么?我不认为我的问题是特定于代码的,但我将在下面添加一些作为示例:
val entityDf = df.select(col("Text"),
col("coordinates"),
col("LocX"),
col("LocY"))
.withColumn("TextClass", functionUdf(col("text")))
我应该在此处添加尝试将此函数用作 UDF 时遇到的错误,以防万一我正在寻找错误问题的解决方案:
线程“主”org.apache.spark.SparkException 中的异常:无法执行用户定义的函数($anonfun$5: (string) => string)
def testFunc(text: String): String ={
val gson = new Gson()
val result = Http("url")
.postData(f"""postdata""")
.header("Content-Type", "application/json")
.header("Charset", "UTF-8")
.option(HttpOptions.readTimeout(10000))
.asString
val rootJson = gson.fromJson(result.body, classOf[rootNerJson])
if(rootJson.classes.length > 0){
return rootJson.classes(0).label
}
return "Null"
}
val functionUdf = udf[String, String](testFunc)
编辑:堆栈跟踪:
线程“主”org.apache.spark.SparkException 中的异常:无法在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF) 处执行用户定义的函数($anonfun$5: (string) => string) .scala:1075) 在 org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:144) 在 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48 ) 在 scala.collection 的 org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30).TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 在 scala.collection。 TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $$anonfun$apply$22.applyOrElse(Optimizer.scala:1147) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$22.applyOrElse(Optimizer.scala:1142) at org.apache.spark .sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org .apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 在 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) 在 org.apache。 spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:272)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql。 org.apache.spark.sql.catalyst.trees.TreeNode 上的催化剂.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)。mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode. scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 上 org.apache.spark.sql.catalyst.trees.TreeNode 上的 trees.TreeNode.mapProductIterator(TreeNode.scala:187)。 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1 处的 transformDown(TreeNode.scala:272)。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:272)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql。 org.apache.spark.sql.catalyst.trees.TreeNode 上的催化剂.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)。mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode. scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun $transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 在 org.apache.spark.sql.catalyst。 org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 上 org.apache.spark.sql.catalyst.trees.TreeNode 上的 trees.TreeNode.mapProductIterator(TreeNode.scala:187)。 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1 处的 transformDown(TreeNode.scala:272)。在 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 在 org.apache.spark.sql.catalyst.trees 应用(TreeNode.scala:272)。 TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees。 TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform( TreeNode.scala:256) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1142) at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer. scala:1141) 在 org.apache.spark.sql.catalyst.rules。RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply (RuleExecutor.scala:82) 在 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) 在 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) 在 scala.collection.mutable.WrappedArray。 foldLeft(WrappedArray.scala:35) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules。 RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute( RuleExecutor.scala:74) 在 org.apache.spark.sql。execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala: 89) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) 在 org.apache.spark.sql 的 org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) .Dataset.head(Dataset.scala:2153) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:2366) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:245) 在org.apache.spark.sql.Dataset。show(Dataset.scala:644) at org.apache.spark.sql.Dataset.show(Dataset.scala:603) at org.apache.spark.sql.Dataset.show(Dataset.scala:612) at OCRJson$$ anonfun$main$1.apply(OCRJson.scala:112) at OCRJson$$anonfun$main$1.apply(OCRJson.scala:24) at scala.collection.immutable.List.foreach(List.scala:392) at OCRJson$ .main(OCRJson.scala:24) at OCRJson.main(OCRJson.scala) 原因:com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: 预期 BEGIN_OBJECT 但在第 1 行第 1 列路径 $ 处为字符串。 google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:226) 在 com.google.gson.Gson.fromJson(Gson.java:922) 在 com.google.gson.Gson.fromJson(Gson. java:887) 在 com.google.gson.Gson.fromJson(Gson.java:836) 在 com.google.gson。Gson.fromJson(Gson.java:808) 在 OCRJson$$anonfun$main$1.OCRJson$$anonfun$$ner$1(OCRJson.scala:69) 在 OCRJson$$anonfun$main$1$$anonfun$5.apply(OCRJson .scala:78) 在 OCRJson$$anonfun$main$1$$anonfun$5.apply(OCRJson.scala:78) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala :92) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91) 在 org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala :1072) ... 87 更多原因:java.lang.IllegalStateException: 预期 BEGIN_OBJECT 但在 com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385) 在 com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385) 的第 1 行第 1 列路径为字符串.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:215) ... 97 更多还有 97 个
解决方案
推荐阅读
- javascript - 通过 AJAX 在 JavaScript 中“手动”刷新页面
- php - 如何在 PHP 中将“流资源”返回类型添加到函数中
- php - 不存在的地方
- python - 使用 virtualenv / virtualenvwrapper 在特定目录中创建 virtualenv
- python - 如何移动存储在列表中的数组行
- python - 卡在 Python 中的嵌套 for 循环上
- awk - 使用 AWK 获取整个记录信息
- python - PyFlink 有 Kinesis 连接器吗?
- python - ValueError: s 必须是标量,或者与 seaborn 可视化中的 x 和 y 大小相同
- javascript - Gulp-filter 的工作很奇怪而且错误