scala - 如何编写数据集编码器以支持将函数映射到 Scala Spark 中的 org.apache.spark.sql.Dataset[String]
问题描述
从 Spark 1.6 迁移到 Spark 2.2* 带来了错误“错误:无法找到存储在“数据集”中的类型的编码器。尝试将方法应用于从查询 parquet 表返回的数据集时的原始类型(Int、String 等)。我过度简化了我的代码来演示同样的错误。代码查询 parquet 文件以返回以下数据类型:'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' 我应用一个函数来提取字符串和整数,返回一个字符串。返回以下数据类型: Array[String] 接下来,我需要执行需要单独函数的大量操作。在这个测试函数中,我尝试附加一个字符串,产生与我的详细示例相同的错误。我尝试了一些编码器示例和“案例”的使用,但还没有提出可行的解决方案。
scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string,
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x"
(1)+","+s.getDecimal(1);
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)
scala> def dd(s:String){
| s + "some string"
| }
dd: (s: String)Unit
scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int,
String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support
for serializing other types will be added in future releases.
为了进一步提炼问题,我相信这种情况(尽管我没有尝试所有可能的解决方案)可以进一步简化为以下代码:
scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def f(s: String){
| s + "hi"
| }
f: (s: String)Unit
scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing other types
will be added in future releases.
var test2 = test.map{ s => f(s) }
解决方案
第一个解决方案不适用于我的初始(生产)数据集,而是产生错误“org.apache.spark.SparkException:任务不可序列化”(有趣的是,虽然两者都存储为相同的数据类型(org.apache.spark.sql .Dataset[String] = [value: string]) 我认为是相关的。我为我的测试数据集添加了另一个解决方案,它消除了初始编码器错误,并且如图所示实际上适用于我的玩具问题,不会上升到生产数据集。对于我的应用程序在从 1.6 到 2.3 版本 spark 的迁移中被搁置的确切原因有点困惑,因为多年来我不必对我的应用程序进行任何特殊调整,并且已经成功运行它以进行最有可能的计算数以万亿计。其他探索包括将我的方法包装为可序列化,探索@transient 关键字,利用“org.apache.spark.serializer.KryoSerializer”,将我的方法编写为函数并将所有变量更改为“vals”(关注“stack”上的相关帖子)。
scala> import spark.implicits._
import spark.implicits._
scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def f(s: String): String = {
| val r = s + "hi"
| return r
| }
f: (s: String)String
scala> var d2 = test.map{s => f(s)}(Encoders.STRING)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d2.take(1)
res0: Array[String] = Array(just some wordshi)
斯卡拉>
推荐阅读
- xcode - 更新 xcode 和 ios
- antd - 如何使用 mobx 观察者与 antd 表列渲染功能
- python - 为什么我的 Python 代码返回 TypeError?
- html - 有没有一种简单的方法可以在页面上水平居中 Bootstrap 导航栏元素?
- google-apps-script - 如果单元格在值移动行中包含 X
- pandas - 如何将条形图中的单个条形更改为不同的颜色?
- jenkins - 有没有办法让 Jenkins 只构建 PR 而不是相应的分支,但也不删除分支作业?
- python-3.8 - Python Wheel 安装成功但无法导入模块
- python - 在python中发送和接收包
- javascript - 导入功能组件+元素类型无效时出错:需要字符串或类/函数但得到:未定义