java - Kafka Connect API 和 Avro 对象(SourceRecord 与 org.apache.avro.Schema)
问题描述
我在使用 kafka 源连接器(需要准备 SourceRecord 实例)将 Avro 对象(org.apache.avro.specific.SpecificRecord 的实例)发送到 kafka 主题时遇到问题。就我而言,我假设基于模式,例如:
{
"namespace": "com.model.avro.generated",
"type": "record",
"name": " MessageExVal",
"version": "1",
"fields": [
{
"name": "messageSource",
"type": "string"
},
{
"name": "messageSourceVersion",
"type": [
"string",
"null"
]
}
]
}
在 for maven的帮助下avro-maven-plugin
,我将生成项目中使用的类模型。类的实例MessageExVal
为我提供了“<code>org.apache.avro.Schema”(通过方法getSchema() or getClassSchema()
)。从另一面 kafka connect api 要求我org.apache.kafka.connect.data.Schema
能够创建由源连接器方法SourceRecord
返回的新实例。poll()
在配置中,我提供参数:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
AvroConverter
在“poll”方法之后执行的in 方法的代码中fromConnectData()
,我看到将完成从org.apache.kafka.connect.data.Schema
to的转换。org.apache.avro.Schema
那么是否有任何选项可以传递 avro 模式而不先将其转换为“连接版本”,因为后来它无论如何都会转换回 avro ?下面您可以在我所指的代码中找到带有注释点的 poll 方法的实现:
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new LinkedList<SourceRecord>();
MessageExVal myValue = MessageExVal.newBuilder()
.setMessageType(“some value”)
.setMessageSource(“some other value”)
.build();
SourceRecord sr = new SourceRecord(null, null,
"test_topic",
myValue.getSchema(), //incorrect - different types
myValue);
records.add(sr);
return records;
}
总结一切,我的问题是如何使用 kafka connect SourceConnector 将“myValue”放入主题?我将非常感谢每一个提示:)
解决方案
因为后来它无论如何都会转换回 avro ?
数据以二进制形式存储在主题中,因此您仍然需要支付反序列化成本
kafka connect api 需要我 org.apache.kafka.connect.data.Schema 才能创建 SourceRecord 的新实例
是的。您可以使用它toConnectData
来获取它,或者您可以从代码的依赖项中删除 Avro,并直接从 Connect 创建 Schema 和 Struct 实例。
转换器负责序列化,Connect 中不需要 Avro
推荐阅读
- java - 如何使 GridLayout 对所有设备通用?
- angular-router - 如果什么都没有,有没有办法在角度路由器历史记录中添加一个页面?
- java - Redirect to an url but showing a different url in the bar adress using tomcat
- python - Error in Celery: "ModuleNotFoundError: No module named 'tasks'"
- html - 具有最小宽度并适应内容的 Flexbox 项目
- javascript - InAppBrowser loadstart 事件未触发/范围问题
- excel - 基于两列数据创建具有多个级别的层次结构
- bootstrap-4 - Bootstrap 4列中断不均匀
- c++ - 从线程 C# 中使用 VC++ 6.0 COM DLL
- visual-studio - 显示所有未添加到 .csproj 文件中的文件夹和文件