首页 > 解决方案 > 将 GenericRecord 转换为 DF

问题描述

我在流式传输中有一个要求,我必须将 GenericRecord 转换为 DatFrame,以便我可以使用 EXPLODE 和 DF 中可用的其他功能。所以首先,我正在研究如何将 GenericRecord 转换为 DF。

我检查了下面的 URL,它有助于将记录转换为 DF。但我无法理解如何将类 SchemaConverterUtils 添加到 avro 对象。

如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

当我尝试编辑时,它给了我只读文件。我是 scala/java 的新手。你能帮我理解如何做到这一点。

谢谢

标签: scalaapache-sparkavrospark-structured-streaming

解决方案


关于那篇文章,spark-avro库已被 DataBricks 弃用并捐赠给 Spark。

ABRiS库提供了一个 UDF 来将 的列转换Array[Byte]为复杂类型的列,并最终转换为DataFrame

在您的情况下,您应该首先进行几次转换。

import org.apache.spark.sql.DataFrame
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.examples.data.generation.AvroDataUtils

val spark: SparkSession = SparkSession
    .builder().master("local[*]").getOrCreate()
// read data into an RDD of GenericRecord called "genericRecordRdd"
// Have your schema in string format in a variable called "stringSchema"
import spark.implicits._
val domainDF: DataFrame = genericRecordRdd
        .map(AvroDataUtils.recordToBytes)
        .toDF("value")
        .select(from_avro(col("value"), stringSchema) as 'data).select("data.*")

AvroDataUtils.recordToBytes是 ABRiS 库的一部分,可将对象GenericRecord转换为Array[Byte]. 然后你创建一个DataFrame只有一列的,它被称为"value"。此时,您已准备好使用from_avroUDF。按照网站上的文档,您确实有其他选择,但根据您的描述,我认为这将是最接近的。


推荐阅读