azure - Eventhub Stream 未捕获模式不匹配
问题描述
当我们从 eventthub 中读取事件时,我们正在尝试实现 badRecordsPath,作为尝试使其工作的示例,我已经放入了应该使事件失败的模式:
eventStreamDF = (spark.readStream
.format("eventhubs")
.options(**eventHubsConf)
.option("badRecordsPath", "/tmp/badRecordsPath/test1")
.schema(badSchema)
.load()
)
然而,这永远不会失败并且总是读取事件,这是数据块的 eventthub 的读取流的行为吗?目前的解决方法是根据我们自己的模式检查 inferSchema。
解决方案
EventHubs 中的数据模式是固定的(请参阅文档)(对于 Kafka 也是如此) - 实际有效负载始终编码为带有 name 的二进制字段,body
开发人员可以根据生产者之间的“联系”来解码此二进制有效负载(s) 数据和该数据的消费者。因此,即使您指定了架构和badRecordsPath
选项,也不会使用它们。
您将需要实现一些函数来解码来自 JSON 或其他东西的数据,例如,如果数据被破坏,则返回 null,然后您将有一个用于 null 值的过滤器将流拆分为两个子流 - 好 &坏数据。
推荐阅读
- javascript - Create a new object from a class name and pass the class name as string
- python - 如何将 F 统计量和 P 值放入表中?
- c - 贪心算法:“未使用的表达式结果”
- latex - 如何在乳胶的围兜文件中添加名称
- javascript - 即使不重新加载页面,如何使用 javascript 继续记录元素的高度
- sms - 是否合法短信
- javascript - 如何从数据库中获取保存的下拉值并使用 jquery 将保存的值绑定到下拉列表
- django - 在 include() 中指定命名空间而不提供 app_name 时出错
- vb.net - 将 API 集成到 vb.net 服务应用程序中以执行
- flask - Watson assistant deployment on Flask+WSGI server (gunicorn or wsgi)