apache-spark - 在 Spark Dataset mapGroups 操作甚至在函数中返回一个字符串之后,值类型是二进制的
问题描述
环境:
Spark version: 2.3.0
Run Mode: Local
Java version: Java 8
spark 应用程序尝试执行以下操作
1) 将输入数据转换为数据集[GenericRecord]
2) 按 GenericRecord 的键属性分组
3)在分组后使用mapGroups迭代值列表并以字符串格式获得一些结果
4) 将结果作为字符串输出到文本文件中。
写入文本文件时发生错误。Spark 推断步骤 3 中生成的 Dataset 具有二进制列,而不是 String 列。但实际上它在 mapGroups 函数中返回一个字符串。
有没有办法进行列数据类型转换或让 Spark 知道它实际上是一个字符串列而不是二进制?
val dslSourcePath = args(0)
val filePath = args(1)
val targetPath = args(2)
val df = spark.read.textFile(filePath)
implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)
val mapResult = df.flatMap(abc => {
JavaConversions.asScalaBuffer(some how return a list of Avro GenericRecord using a java library).seq;
})
val groupResult = mapResult.groupByKey(result => String.valueOf(result.get("key")))
.mapGroups((key, valueList) => {
val result = StringBuilder.newBuilder.append(key).append(",").append(valueList.count(_=>true))
result.toString()
})
groupResult.printSchema()
groupResult.write.text(targetPath + "-result-" + System.currentTimeMillis())
输出说它是一个垃圾箱
root
|-- value: binary (nullable = true)
Spark 给出一个错误,它不能将二进制写入文本:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a string column, but you have binary.;
at org.apache.spark.sql.execution.datasources.text.TextFileFormat.verifySchema(TextFileFormat.scala:55)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat.prepareWrite(TextFileFormat.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:595)
解决方案
正如@user10938362 所说,原因是以下代码会将所有数据编码为字节
implicit def kryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct)
用以下代码替换它只会为 GenericRecord 启用此编码
implicit def kryoEncoder: Encoder[GenericRecord] = Encoders.kryo
推荐阅读
- java - 在 jFreeChart 直方图中配置两个范围轴
- arm - 在 LDR 之后立即在 ARM 中预取中止
- dialogflow-es - Dailogflow 内联编辑器响应
- javascript - 在 JSON 响应的同一列下添加多个链接
- python - 我什么时候应该使用 setUpClass 什么时候只使用一个类成员?
- android - 如何识别 Rx 中的 Seekbar 更改事件?
- c - 为什么 double (*p)[][] 不能是有效参数,而在一维数组中 double (*p)[n] 和 double (*p)[] 可以工作?
- android - 选择导航项时活动(有时是应用程序)关闭
- angular - 如何使用primeng在p-calender中禁用过去几天的选择?
- mongodb - 重组文件:这可能吗?