apache-kafka - 通过 kafka-avro-console-producer 生成 Avro 序列化对象的问题
问题描述
我正在使用kafka -avro-console-producer二进制文件生成一条消息:
kafka-avro-console-producer --broker-list broker:9092 --topic example-topic --property schema.registry.url='http://schema-registry:8081 --property value.schema='{"type": "record","name": "test","fields": [{"name": "before", "type": ["null", {"type": "record", "name": "columns", "fields":[{"name": "name", "type": "string"}]}],"default": "null"},{"name": "after", "type": ["null", "columns"],"default": "null"}]}'
{"before": null,"after": {"name": "John"}}'
发送以下消息:
{"before": null,"after": {"name": "John"}}
并通过应用以下Avro 架构:
{
"type": "record",
"name": "test",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "columns",
"fields": [{
"name": "name",
"type": "string"
}]
}],
"default": "null"
}, {
"name": "after",
"type": ["null", "columns"],
"default": "null"
}]
}
我得到的错误回报如下:
Caused by: org.apache.avro.AvroTypeException: Unknown union branch name
at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:445)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:213)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:180)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:55)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
对于那些愿意深入兔子洞的人,我正在使用 Oracle Golden Gate 大数据连接器在 Oracle Golden Gate 和 Apache Kafka 之间进行集成。我目前遇到与此处描述的等效模型有关的问题:
https://www.ateam-oracle.com/oracle-goldengate-big-data-adapter-apache-kafka-producer
当尝试将上述网页中描述的架构应用到它的相应模型时(并且在完成 JSON 模型之后),我遇到的错误与我在问题中的模型和架构中遇到的错误相同。
非常感谢大家。
解决方案
这就是问题
"type": ["null", "columns"]
您不能参考其他记录类型。您需要像在其他领域一样扩展它
推荐阅读
- ios - 应用程序从后台返回后位置不更新
- javascript - 在 React 中的父对象内循环对象
- javascript - 如何使用 restify 提供静态文件?
- docker - 从私有注册表中提取失败 - 不支持的 docker v1 存储库请求
- php - Nginx + php7.0-fpm = 空白页
- qt - QObject:无法为不同线程中的父级创建子级:父级线程:QThread(0x221f650),当前线程:QThread(0x23a7950)
- c# - 当嵌入式 DLL 需要时强制加载 .Extensions 命名空间
- makefile - 通过 make 文件自动执行版本号
- sql - SQL Server 2012 从 CSV 批量插入临时表
- python - Keras:需要验证拆分的回调?