首页 > 解决方案 > Kafka Connect API 和 Avro 对象(SourceRecord 与 org.apache.avro.Schema)

问题描述

我在使用 kafka 源连接器(需要准备 SourceRecord 实例)将 Avro 对象(org.apache.avro.specific.SpecificRecord 的实例)发送到 kafka 主题时遇到问题。就我而言,我假设基于模式,例如:

{
    "namespace": "com.model.avro.generated",
    "type": "record",
    "name": " MessageExVal",
    "version": "1",
    "fields": [
        {
            "name": "messageSource",
            "type": "string"
        },
        {
            "name": "messageSourceVersion",
            "type": [
                "string",
                "null"
            ]
        }
    ]
}

在 for maven的帮助下avro-maven-plugin,我将生成项目中使用的类模型。类的实例MessageExVal为我提供了“<code>org.apache.avro.Schema”(通过方法getSchema() or getClassSchema())。从另一面 kafka connect api 要求我org.apache.kafka.connect.data.Schema能够创建由源连接器方法SourceRecord返回的新实例。poll()在配置中,我提供参数:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

AvroConverter在“poll”方法之后执行的in 方法的代码中fromConnectData(),我看到将完成从org.apache.kafka.connect.data.Schemato的转换。org.apache.avro.Schema那么是否有任何选项可以传递 avro 模式而不先将其转换为“连接版本”,因为后来它无论如何都会转换回 avro ?下面您可以在我所指的代码中找到带有注释点的 poll 方法的实现:

@Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new LinkedList<SourceRecord>();
        MessageExVal myValue = MessageExVal.newBuilder()
                .setMessageType(“some value”)
                .setMessageSource(“some other value”)
                .build();
        SourceRecord sr = new SourceRecord(null, null,
                "test_topic",
                myValue.getSchema(), //incorrect - different types
                myValue);
        records.add(sr);
        return records;
    }

总结一切,我的问题是如何使用 kafka connect SourceConnector 将“myValue”放入主题?我将非常感谢每一个提示:)

标签: javaapache-kafkaavroapache-kafka-connect

解决方案


因为后来它无论如何都会转换回 avro ?

数据以二进制形式存储在主题中,因此您仍然需要支付反序列化成本

kafka connect api 需要我 org.apache.kafka.connect.data.Schema 才能创建 SourceRecord 的新实例

是的。您可以使用它toConnectData来获取它,或者您可以从代码的依赖项中删除 Avro,并直接从 Connect 创建 Schema 和 Struct 实例。

转换器负责序列化,Connect 中不需要 Avro


推荐阅读