scala - 将 GenericRecord 转换为 DF
问题描述
我在流式传输中有一个要求,我必须将 GenericRecord 转换为 DatFrame,以便我可以使用 EXPLODE 和 DF 中可用的其他功能。所以首先,我正在研究如何将 GenericRecord 转换为 DF。
我检查了下面的 URL,它有助于将记录转换为 DF。但我无法理解如何将类 SchemaConverterUtils 添加到 avro 对象。
如何将 RDD [GenericRecord] 转换为 scala 中的数据框?
当我尝试编辑时,它给了我只读文件。我是 scala/java 的新手。你能帮我理解如何做到这一点。
谢谢
解决方案
关于那篇文章,spark-avro库已被 DataBricks 弃用并捐赠给 Spark。
ABRiS库提供了一个 UDF 来将 的列转换Array[Byte]
为复杂类型的列,并最终转换为DataFrame。
在您的情况下,您应该首先进行几次转换。
import org.apache.spark.sql.DataFrame
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.examples.data.generation.AvroDataUtils
val spark: SparkSession = SparkSession
.builder().master("local[*]").getOrCreate()
// read data into an RDD of GenericRecord called "genericRecordRdd"
// Have your schema in string format in a variable called "stringSchema"
import spark.implicits._
val domainDF: DataFrame = genericRecordRdd
.map(AvroDataUtils.recordToBytes)
.toDF("value")
.select(from_avro(col("value"), stringSchema) as 'data).select("data.*")
AvroDataUtils.recordToBytes
是 ABRiS 库的一部分,可将对象GenericRecord
转换为Array[Byte]
. 然后你创建一个DataFrame
只有一列的,它被称为"value"
。此时,您已准备好使用from_avro
UDF。按照网站上的文档,您确实有其他选择,但根据您的描述,我认为这将是最接近的。
推荐阅读
- graphql - GraphQL:游标是读取事务的一部分吗?轮询新数据
- php - 帖子不是随机排序的
- python - rdkit 使用 MolToFile() 生成两个不同的图像并在 Jupyter 中显示
- core-data - 如何使用 SwiftUI 在模态视图中创建 NSManagedObject?
- ruby-on-rails - Howler.js 通过 Rails ActionCable 从 websocket 播放音频
- node.js - NodeJS 文件系统
- imagemagick - 在imagemagick中裁剪图像时如何设置偏移量
- html - 为什么使用百分比时我的图像高度不起作用?
- php - 创建对多维数组键的动态可解析调用,使其不被逐字处理
- java - 将 Json 发送到 REST API - 访问控制允许来源 [错误]