首页 > 解决方案 > 如何编写数据集编码器以支持将函数映射到 Scala Spark 中的 org.apache.spark.sql.Dataset[String]

问题描述

从 Spark 1.6 迁移到 Spark 2.2* 带来了错误“错误:无法找到存储在“数据集”中的类型的编码器。尝试将方法应用于从查询 parquet 表返回的数据集时的原始类型(Int、String 等)。我过度简化了我的代码来演示同样的错误。代码查询 parquet 文件以返回以下数据类型:'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' 我应用一个函数来提取字符串和整数,返回一个字符串。返回以下数据类型: Array[String] 接下来,我需要执行需要单独函数的大量操作。在这个测试函数中,我尝试附加一个字符串,产生与我的详细示例相同的错误。我尝试了一些编码器示例和“案例”的使用,但还没有提出可行的解决方案。

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

为了进一步提炼问题,我相信这种情况(尽管我没有尝试所有可能的解决方案)可以进一步简化为以下代码:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }

标签: scalaapache-sparkdatasetencoder

解决方案


第一个解决方案不适用于我的初始(生产)数据集,而是产生错误“org.apache.spark.SparkException:任务不可序列化”(有趣的是,虽然两者都存储为相同的数据类型(org.apache.spark.sql .Dataset[String] = [value: string]) 我认为是相关的。我为我的测试数据集添加了另一个解决方案,它消除了初始编码器错误,并且如图所示实际上适用于我的玩具问题,不会上升到生产数据集。对于我的应用程序在从 1.6 到 2.3 版本 spark 的迁移中被搁置的确切原因有点困惑,因为多年来我不必对我的应用程序进行任何特殊调整,并且已经成功运行它以进行最有可能的计算数以万亿计。其他探索包括将我的方法包装为可序列化,探索@transient 关键字,利用“org.apache.spark.serializer.KryoSerializer”,将我的方法编写为函数并将所有变量更改为“vals”(关注“stack”上的相关帖子)。

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

斯卡拉>


推荐阅读