首页 > 解决方案 > Confluent AvroSourceConnector

问题描述

我的用例:

我有一个 spoolDirAvroSourceConnector,它从存在多个 .avro 文件的目录中读取数据。每个都有自己的架构。

/path/input/
  entity1.avro
  entity2.avro

连接器配置:

{
  "name": "avro_source_connector",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirAvroSourceConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "Topic",
    "config.action.reload": "restart",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topic.creation.groups": "avro-source-grp",
    "transforms.Topic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.Topic.field": "entity",
    "topic": "test-topic",
    "input.path": "/input/",
    "finished.path": "/output/",
    "error.path": "/failed/",
    "input.file.pattern": "^file.*",
    "halt.on.error": "true",
    "topic.creation.default.partitions": "1",
    "value.converter.schema.registry.url": "http://host:8081",
    "topic.creation.default.replication.factor": "2",
    "key.converter.schema.registry.url": "http://host:8081"
  }
}

我正在使用转换,因为我需要从现场记录中查找主题。

问题:只有一个实体成功。其他实体抛出以下错误

Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.avro.AvroTypeException: Found entity2, expecting entity1, missing required field stateField
        at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:249)
        ... 10 more
Caused by: org.apache.avro.AvroTypeException: Found entity2, expecting entity1, missing required field stateField
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)

对此有什么帮助吗?

标签: apache-kafka-connectconfluent-platformspooldir-avro-source-connector

解决方案


推荐阅读