首页 > 解决方案 > 通过 kafka-avro-console-producer 生成 Avro 序列化对象的问题

问题描述

我正在使用kafka -avro-console-producer二进制文件生成一条消息:

kafka-avro-console-producer --broker-list broker:9092 --topic example-topic --property schema.registry.url='http://schema-registry:8081 --property value.schema='{"type": "record","name": "test","fields": [{"name": "before", "type": ["null", {"type": "record", "name": "columns", "fields":[{"name": "name", "type": "string"}]}],"default": "null"},{"name": "after", "type": ["null", "columns"],"default": "null"}]}'
{"before": null,"after": {"name": "John"}}'

发送以下消息

{"before": null,"after": {"name": "John"}}

并通过应用以下Avro 架构

{
    "type": "record",
    "name": "test",
    "fields": [{
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "columns",
            "fields": [{
                "name": "name",
                "type": "string"
            }]
        }],
        "default": "null"
    }, {
        "name": "after",
        "type": ["null", "columns"],
        "default": "null"
    }]
}

我得到的错误回报如下:

Caused by: org.apache.avro.AvroTypeException: Unknown union branch name
    at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:445)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:213)
    at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:180)
    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:55)
    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

对于那些愿意深入兔子洞的人,我正在使用 Oracle Golden Gate 大数据连接器在 Oracle Golden Gate 和 Apache Kafka 之间进行集成。我目前遇到与此处描述的等效模型有关的问题:

https://www.ateam-oracle.com/oracle-goldengate-big-data-adapter-apache-kafka-producer

当尝试将上述网页中描述的架构应用到它的相应模型时(并且在完成 JSON 模型之后),我遇到的错误与我在问题中的模型和架​​构中遇到的错误相同。


非常感谢大家。

标签: apache-kafkaavrooracle-golden-gate

解决方案


这就是问题

"type": ["null", "columns"]

您不能参考其他记录类型。您需要像在其他领域一样扩展它


推荐阅读