scala - Why does Spark infer a binary instead of an Array[Byte] when creating a DataFrame?
问题描述
In principle, I have a DataFrame
that consists of the "Name" and "Values" fields. The first field is a String
, while the second is an Array[Byte]
.
What I want to do with each record of this DataFrame
is to apply any function, using a UDF
and create a new column. This works perfectly when "Values" is an Array[Int]
. However, when it is an Array[Byte]
, the following error appears:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
+- ExternalRDD [obj#43]
The full code is the following:
scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]
scala> df1.show
+----+----------------+
|Name| Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+
scala> val twice = udf { (values: Seq[Byte]) =>
| val result = Array.ofDim[Byte](values.length)
| for (i <- values.indices)
| result(i) = (2 * values(i).toInt).toByte
| result
| }
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))
scala> val df2 = df1.withColumn("TwoTimes", twice('Values))
I understand that an error like this arises due to the wrong data type (expected: Array[Byte]
, however it finds a Binary
), but what I do not understand is why Spark has inferred my Array[Byte]
as a Binary
. Can someone explain to me, please?
If I had to use Binary
instead of Array[Byte]
, how should I handle it within my UDF
?
I clarify that my original UDF
does not use a trivial for
loop. I understand that in this example, this could be replaced by the map
method.
解决方案
在 Spark 中,Array[Byte]
表示为BinaryType
. 从文档中我们可以看到:
public class BinaryType extends DataType表示值
的数据类型。Array[Byte]
请使用单例 DataTypes.BinaryType。
因此,Array[Byte]
和Binary
是相同的,但是,它们之间存在一些差异,Seq[Byte]
这会导致错误。
要解决此问题,只需在 udf 中替换Seq[Byte]
为Array[Byte]
:
val twice = udf { (values: Array[Byte]) =>
val result = Array.ofDim[Byte](values.length)
for (i <- values.indices)
result(i) = (2 * values(i).toInt).toByte
result
}
推荐阅读
- html - 列内容垂直对齐 - Bootstrap 4
- reactjs - React 组件中 prop 参数的动态泛型
- c++ - Qt如何改成C++17
- google-data-studio - 数据工作室 | 日期过滤器 | 比今天更伟大
- r - s3.将 json 文件保存到 aws s3
- mysql - MYSQL - 在多个条件下左连接 - 媒体和标签和标签链接
- pandas - 在 Pandas 中,print() 和 display() 如何呈现完全不同的数字?
- powerbi - 如何根据选定的筛选器动态更改 Power BI DAX 中的筛选器
- javascript - 鉴于此 MLB 算法,我如何预测获胜者?
- php - 由于friendsofsymfony/user-bundle,安装symfony/mailer失败