python - Azure Function App - 异常:AssertionError:正文和元数据的数量不匹配
问题描述
我正在 Azure 上开发处理发送到 Azure EventHub 的消息的函数应用程序。消息由 Debezium(PostgreSQL 上的 CDC)生成。消息采用 JSON 格式。下面的大部分代码都可以正常工作,但对于特定消息(删除),我收到了我不理解的错误......
[2021-10-22T16:23:46.679Z] Worker process started and initialized.
[2021-10-22T16:23:50.220Z] Host lock lease acquired by instance ID '0000000000000000000000002A92F406'.
[2021-10-22T16:25:33.353Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a)
[2021-10-22T16:25:33.355Z] Trigger Details: PartionId: 0, Offset: 12885023368-12885023368, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.2280000Z, SequenceNumber: 228-228, Count: 1
[2021-10-22T16:25:33.417Z] New event detected
[2021-10-22T16:25:33.420Z] Event message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"Adventureworks.humanresources.department.Envelope"},"payload":{"before":{"departmentid":186,"name":"","groupname":"","modifieddate":0},"after":null,"source":{"version":"1.6.2.Final","connector":"postgresql","name":"Adventureworks","ts_ms":1634919842294,"snapshot":"false","db":"Adventureworks","sequence":"[\"1040891432\",\"1040891432\"]","schema":"humanresources","table":"department","txId":203668,"lsn":1040927480,"xmin":null},"op":"d","ts_ms":1634919933086,"transaction":null}}
[2021-10-22T16:25:33.439Z] Executed 'Functions.testEventHubTrigger' (Succeeded, Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a, Duration=94ms)
[2021-10-22T16:25:33.613Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=a693051f-790c-4b90-af21-5792a386c7a1)
[2021-10-22T16:25:33.616Z] Trigger Details: PartionId: 0, Offset: 12885026456-12885032224, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.3060000Z, SequenceNumber: 229-231, Count: 3
[2021-10-22T16:25:33.636Z] Executed 'Functions.testEventHubTrigger' (Failed, Id=a693051f-790c-4b90-af21-5792a386c7a1, Duration=23ms)
[2021-10-22T16:25:33.639Z] System.Private.CoreLib: Exception while executing function: Functions.testEventHubTrigger. System.Private.CoreLib: Result: Failure
Exception: AssertionError: Number of bodies and metadata mismatched
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 382, in _handle__invocation_request
args[pb.name] = bindings.from_incoming_proto(
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure_functions_worker\bindings\meta.py", line 87, in from_incoming_proto
return binding.decode(datum, trigger_metadata=metadata)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure\functions\eventhub.py", line 108, in decode
return cls.decode_multiple_events(data, trigger_metadata)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure\functions\eventhub.py", line 159, in decode_multiple_events
raise AssertionError('Number of bodies and metadata mismatched')
.
下面是我的函数代码:
from typing import BinaryIO, List
import logging
import json
import psycopg2
import azure.functions as func
def main(events: List[func.EventHubEvent]):
for event in events:
logging.debug('New event detected')
logging.debug('Event message: %s',
event.get_body().decode('utf-8'))
row = json.loads(event.get_body().decode('utf-8'))
当迭代器在 for 循环中更改时出现错误(因为它会尝试读取不存在的内容?)
请帮我调试这个...
编辑似乎在 PostgeSQL 上执行删除操作时,Debezium 正在生成两条消息。一个带有有效负载,第二个称为“墓碑”消息。(https://debezium.io/documentation/reference/1.7/connectors/postgresql.html)第二条消息是空的,这很可能是导致上述异常的原因。
PostgreSQL 连接器事件旨在与 Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。
Tombstone events 当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为空。为了使这成为可能,PostgreSQL 连接器跟随一个带有特殊 tombstone 事件的删除事件,该事件具有相同的键但为空值。
有谁可以帮我处理空消息?
解决方案
推荐阅读
- javascript - 我不能引用 const 因为“未定义”但它是
- laravel - 在 Laravel 验证请求中,如何在更新时处理不同列的唯一性
- python - 计算两个 txt.files 之间的相似度
- java - 如何在 Spring Boot 中实现单线程调度程序?
- python - 向旧的 csv 文件添加新行
- c++ - 为什么我的函数无法使用指针打印多维数组的元素?
- java - Spring Boot 延迟 Flyway 初始化循环依赖
- python-3.x - Tkinter - 带有标签的窗口空白,但可以打印标签中的相同值
- javascript - 如何在没有用户主动登录的情况下从 Outlook 插件任务窗格获取联系人?
- java - Spring servlet 适用于 IntelliJ,但不适用于 tomcat