首页 > 解决方案 > Azure Function App - 异常:AssertionError:正文和元数据的数量不匹配

问题描述

我正在 Azure 上开发处理发送到 Azure EventHub 的消息的函数应用程序。消息由 Debezium(PostgreSQL 上的 CDC)生成。消息采用 JSON 格式。下面的大部分代码都可以正常工作,但对于特定消息(删除),我收到了我不理解的错误......

[2021-10-22T16:23:46.679Z] Worker process started and initialized.
[2021-10-22T16:23:50.220Z] Host lock lease acquired by instance ID '0000000000000000000000002A92F406'.
[2021-10-22T16:25:33.353Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a)
[2021-10-22T16:25:33.355Z] Trigger Details: PartionId: 0, Offset: 12885023368-12885023368, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.2280000Z, SequenceNumber: 228-228, Count: 1
[2021-10-22T16:25:33.417Z] New event detected
[2021-10-22T16:25:33.420Z] Event message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"Adventureworks.humanresources.department.Envelope"},"payload":{"before":{"departmentid":186,"name":"","groupname":"","modifieddate":0},"after":null,"source":{"version":"1.6.2.Final","connector":"postgresql","name":"Adventureworks","ts_ms":1634919842294,"snapshot":"false","db":"Adventureworks","sequence":"[\"1040891432\",\"1040891432\"]","schema":"humanresources","table":"department","txId":203668,"lsn":1040927480,"xmin":null},"op":"d","ts_ms":1634919933086,"transaction":null}}
[2021-10-22T16:25:33.439Z] Executed 'Functions.testEventHubTrigger' (Succeeded, Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a, Duration=94ms)
[2021-10-22T16:25:33.613Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=a693051f-790c-4b90-af21-5792a386c7a1)
[2021-10-22T16:25:33.616Z] Trigger Details: PartionId: 0, Offset: 12885026456-12885032224, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.3060000Z, SequenceNumber: 229-231, Count: 3
[2021-10-22T16:25:33.636Z] Executed 'Functions.testEventHubTrigger' (Failed, Id=a693051f-790c-4b90-af21-5792a386c7a1, Duration=23ms)
[2021-10-22T16:25:33.639Z] System.Private.CoreLib: Exception while executing function: Functions.testEventHubTrigger. System.Private.CoreLib: Result: Failure
Exception: AssertionError: Number of bodies and metadata mismatched
Stack:   File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 382, in _handle__invocation_request
    args[pb.name] = bindings.from_incoming_proto(
  File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure_functions_worker\bindings\meta.py", line 87, in from_incoming_proto
    return binding.decode(datum, trigger_metadata=metadata)
  File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure\functions\eventhub.py", line 108, in decode
    return cls.decode_multiple_events(data, trigger_metadata)
  File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.8/WINDOWS/X64\azure\functions\eventhub.py", line 159, in decode_multiple_events
    raise AssertionError('Number of bodies and metadata mismatched')
.

下面是我的函数代码:

from typing import BinaryIO, List
import logging
import json
import psycopg2

import azure.functions as func

def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.debug('New event detected')
        logging.debug('Event message: %s',
                        event.get_body().decode('utf-8'))
        row = json.loads(event.get_body().decode('utf-8'))

当迭代器在 for 循环中更改时出现错误(因为它会尝试读取不存在的内容?)

请帮我调试这个...

编辑似乎在 PostgeSQL 上执行删除操作时,Debezium 正在生成两条消息。一个带有有效负载,第二个称为“墓碑”消息。(https://debezium.io/documentation/reference/1.7/connectors/postgresql.html)第二条消息是空的,这很可能是导致上述异常的原因。

PostgreSQL 连接器事件旨在与 Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

Tombstone events 当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为空。为了使这成为可能,PostgreSQL 连接器跟随一个带有特殊 tombstone 事件的删除事件,该事件具有相同的键但为空值。

有谁可以帮我处理空消息?

标签: pythonazure-functionsazure-eventhubdebezium

解决方案


推荐阅读