scala - Spark 流式传输:非结构化记录
问题描述
我们正在使用流式处理来自EventHub的数据。传入流包含各种类型的JSON记录(大约 400 种不同类型)
每条记录都将使用ProductId属性进行分类。
示例(传入的记录流):
record1 - { productId: 101, colA: "some-val", colB: "some-val" }
record2 - { productId: 104, colC: "some-val", colD: "some-val" }
record3 - { productId: 202, colA: "some-val", colD: "some-val", colF: "some-val" }
record3 - { productId: 342, colH: "some-val", colJ: "some-val", colK: "some-val" }
每条记录中的属性数量各不相同,但具有相似productId的记录将具有完全相同数量的属性。
ProductId 范围为 (1 - 400),记录中的属性数量最多为 50。
我想阅读上面的 JSON 记录流并写入不同的parquet/delta 位置,例如
Location(Delta/Parquet) Records
-----------------------------------------------------------------
/mnt/product-101 Contains all records with productId - 101
/mnt/product-104 Contains all records with productId - 104
/mnt/product-202 Contains all records with productId - 202
/mnt/product-342 Contains all records with productId - 342
1) 如何从包含不同类型记录的流中创建 DataFrame/Dataset?
2)是否可以使用单个火花流并写入不同的增量/镶木地板位置?
解决方案
请注意,使用此方法应该可以工作,但它会生成大量稀疏数据。
首先创建一个包含所有列的 StructType。
val schema = new StructType().add("productId", LongType)
.add("colA", StringType).add("colB", StringType)
.add("colC", StringType)....
然后使用此模式和 from_json 函数创建一个流。
val df1 = df.select(from_json('value, schema).alias("tmp"))
.select("tmp.*")
最后使用 partitionBy 写入分区 parquet 文件。
val query1 = df1
.writeStream
.format("parquet")
.option("path", "/mnt/product/")
.option("checkpointLocation","/tmp/checkpoint")
.partitionBy("productId").start()
这将生成包含所有列的行。最初不在 json 输入中的列将被标记为空。Parquet 支持写入空值。但是如果你先过滤它们会更好。
推荐阅读
- javascript - 如何删除附加到 auth0-lock 的侦听器?
- gcc - Nix 和 GCC - `找不到 crt1.o`
- java - 序列化包含数据集的对象
- ios - iOS PDFKit 文本字段表单输入
- node.js - dialogflow fullfilment 和 firebase 响应时间
- reactjs - 使用 ReactJS 添加静态图像
- javascript - 如何在电子应用程序中使用快递
- php - wordpress 主题未安装权限被拒绝无法创建目录。在本地主机上的我的 macbook air
- angular - 同时使用 ngFor 和 ngIf 列出项目
- javascript - VueJS data property returns undefined in mounted function