apache-kafka-connect - Confluent AvroSourceConnector
问题描述
我的用例:
我有一个 spoolDirAvroSourceConnector,它从存在多个 .avro 文件的目录中读取数据。每个都有自己的架构。
/path/input/
entity1.avro
entity2.avro
连接器配置:
{
"name": "avro_source_connector",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirAvroSourceConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "Topic",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topic.creation.groups": "avro-source-grp",
"transforms.Topic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.Topic.field": "entity",
"topic": "test-topic",
"input.path": "/input/",
"finished.path": "/output/",
"error.path": "/failed/",
"input.file.pattern": "^file.*",
"halt.on.error": "true",
"topic.creation.default.partitions": "1",
"value.converter.schema.registry.url": "http://host:8081",
"topic.creation.default.replication.factor": "2",
"key.converter.schema.registry.url": "http://host:8081"
}
}
我正在使用转换,因为我需要从现场记录中查找主题。
问题:只有一个实体成功。其他实体抛出以下错误
Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.avro.AvroTypeException: Found entity2, expecting entity1, missing required field stateField
at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:249)
... 10 more
Caused by: org.apache.avro.AvroTypeException: Found entity2, expecting entity1, missing required field stateField
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
对此有什么帮助吗?
解决方案
推荐阅读
- javascript - 如何使用 javascript 关闭 file_browse 对话窗口
- angular6 - 如何从不同的域以角度 6 呈现 html 响应
- wpf - 在 wpf,MVVM 灯下,突然简单的 relaycommand 不起作用。我应该从哪里开始寻找?
- unit-testing - 在 AOT/release 或 profile 模式下运行颤振/飞镖测试?
- android - VoicePackage ID 是否保证稳定?
- javascript - 打字稿函数重载和缺少参数
- c++ - 将 32 位遗留代码移植到 64 位时如何处理不断变化的数据类型大小?
- java - 使用 GSON 在 Volley 中发送参数
- swift - 如何在 Controller 中使用两个按钮?每个按钮都有一个tableview?
- flutter - 如何在 Flutter 中重新加载 webview?