首页 > 解决方案 > Eventhub Stream 未捕获模式不匹配

问题描述

当我们从 eventthub 中读取事件时,我们正在尝试实现 badRecordsPath,作为尝试使其工作的示例,我已经放入了应该使事件失败的模式:

eventStreamDF = (spark.readStream
  .format("eventhubs")
  .options(**eventHubsConf)
  .option("badRecordsPath", "/tmp/badRecordsPath/test1")
  .schema(badSchema)
  .load()
) 

然而,这永远不会失败并且总是读取事件,这是数据块的 eventthub 的读取流的行为吗?目前的解决方法是根据我们自己的模式检查 inferSchema。

标签: azuredatabricksazure-databricksazure-eventhub

解决方案


EventHubs 中的数据模式是固定的(请参阅文档)(对于 Kafka 也是如此) - 实际有效负载始终编码为带有 name 的二进制字段,body开发人员可以根据生产者之间的“联系”来解码此二进制有效负载(s) 数据和该数据的消费者。因此,即使您指定了架构和badRecordsPath选项,也不会使用它们。

您将需要实现一些函数来解码来自 JSON 或其他东西的数据,例如,如果数据被破坏,则返回 null,然后您将有一个用于 null 值的过滤器将流拆分为两个子流 - 好 &坏数据。


推荐阅读