java - AVRO 原始类型的 Serde 类
问题描述
我正在用 Java 编写一个 Kafka 流应用程序,该应用程序接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器。连接器产生以下模式:
key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string"}
]}
实际上,有几个主题,键模式总是“int”,值模式总是某种记录(用户、产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
起初,我尝试使用类似的东西来使用该主题,
Consumed.with(Serdes.Integer(), userSerde);
但这不起作用,因为 Serdes.Integer() 期望整数使用 4 个字节进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);
有效,但我真的想要 int 而不是 bytes 所以我将代码更改为此
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true);
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)
强制转换)但它允许我使用
Consumed.with(keySerde, userSerde);
并获取一个整数作为键。这工作得很好,我的应用程序按预期运行(太棒了!!!)。但是现在我想为键/值定义默认的 serde,但我无法让它工作。
设置默认值 serde 很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是我无法弄清楚如何定义默认密钥 serde。
我试过了
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName());
产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
产生运行时错误:java.lang.Integer 不能转换为 org.apache.avro.specific.SpecificRecord
我错过了什么?谢谢。
解决方案
更新 (版本 5.5 及更高版本)
Confluent 版本5.5
通过PrimitiveAvroSerde
(参见https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams /serdes/avro/PrimitiveAvroSerde.java )
原始答案 (5.4 及更早版本):
这是一个已知问题。原始 Avro 类型不能很好地与 Confluent 的 AvroSerdes 配合使用,因为 SerdesGenericAvroRecord
只能配合SpecificAvroRecord
使用。
因此,基于KafkaAvroSerializer
并且构建您自己的 SerdeKafkaAvroDeserializer
是正确的方法。为了能够将其作为默认 Serde 传递到配置中,您不能使用Serdes.serdeFrom
,因为类型信息由于泛型类型擦除而丢失。
但是,您可以实现自己的扩展Serde
接口的类,并将自定义类传递到配置中:
public class MySerde extends Serde<Integer> {
// use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
推荐阅读
- autodesk-forge - 是否有必要卸载加载的模型,或者我应该从主 forgeViewer 标签中删除所有标签?
- elasticsearch - 响应中未保留的 agg 名称的排序
- arrays - 从 Swiftui 中的变量获取对象的特定属性
- visual-studio - 在 Visual Code Studio 中哪里可以找到 Flutter 布局检查器?
- javascript - 如何根据 Vue JS 中 for 循环中的布尔值显示图像?
- python - “自动”包的 pip 安装错误
- winapi - Winapi 如何将矩形绘制到特定的窗口句柄?
- sql - 在 CASE 语句中聚合列
- node.js - Github Actions 组织节点部署到 github 页面
- catalyst - 无论如何,我可以获得 Cat4k 交换机的所有 cli 命令的列表吗?