首页 > 解决方案 > Spark 流式传输:非结构化记录

问题描述

我们正在使用流式处理来自EventHub的数据。传入流包含各种类型的JSON记录(大约 400 种不同类型)

每条记录都将使用ProductId属性进行分类。

示例(传入的记录流):

record1 - { productId: 101, colA: "some-val", colB: "some-val" }
record2 - { productId: 104, colC: "some-val", colD: "some-val" }
record3 - { productId: 202, colA: "some-val", colD: "some-val", colF: "some-val" }
record3 - { productId: 342, colH: "some-val", colJ: "some-val", colK: "some-val" }

每条记录中的属性数量各不相同,但具有相似productId的记录将具有完全相同数量的属性。

ProductId 范围为 (1 - 400),记录中的属性数量最多为 50。

我想阅读上面的 JSON 记录流并写入不同的parquet/delta 位置,例如

    Location(Delta/Parquet)             Records
    -----------------------------------------------------------------
    /mnt/product-101        Contains all records with productId - 101
    /mnt/product-104        Contains all records with productId - 104
    /mnt/product-202        Contains all records with productId - 202
    /mnt/product-342        Contains all records with productId - 342

1) 如何从包含不同类型记录的流中创建 DataFrame/Dataset?

2)是否可以使用单个火花流并写入不同的增量/镶木地板位置?

标签: scalaapache-sparkspark-streamingdatabricksspark-structured-streaming

解决方案


请注意,使用此方法应该可以工作,但它会生成大量稀疏数据。

首先创建一个包含所有列的 StructType。

  val schema = new StructType().add("productId", LongType)
  .add("colA", StringType).add("colB", StringType)
  .add("colC", StringType)....

然后使用此模式和 from_json 函数创建一个流。

val df1 = df.select(from_json('value, schema).alias("tmp"))
.select("tmp.*")

最后使用 partitionBy 写入分区 parquet 文件。

val query1 = df1 
.writeStream 
.format("parquet") 
.option("path", "/mnt/product/") 
.option("checkpointLocation","/tmp/checkpoint")
.partitionBy("productId").start()

这将生成包含所有列的行。最初不在 json 输入中的列将被标记为空。Parquet 支持写入空值。但是如果你先过滤它们会更好。


推荐阅读