首页 > 解决方案 > 使用 kafka lib 反序列化 PRIMITIVE AVRO KEY

问题描述

我目前无法反序列化KSTREAM APP 中的 avro PRIMITIVE 密钥

用 avro 模式编码的密钥(在模式注册表中注册),

当我使用 kafka-avro-console-consumer 时,我可以看到密钥已正确反序列化

但不可能让它在 KSTREAM 应用程序中工作

键的 avro 模式是一个 PRIMITIVE:

{"type":"string"}

我已经按照汇合的文档

final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);

final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);

Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);

final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);

mystream.peek((key, value) -> logger.info("topic KEY :" + key))

它适用于该值,但键将是一个字符串,其中包含模式注册表中的字节,而不仅仅是“reel”键

https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

所以字符串键是 /§/./11016015201 ,但我想要卷轴值:1016015201

如果我打印字符串中的字节,它是 [ 0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31 ]

标签: javaapache-kafkaavroapache-kafka-streamsconfluent-schema-registry

解决方案


更新

它现在正在工作:https ://stackoverflow.com/a/51957801/6227500

原始答案

该功能目前在模式注册表项目中不可用。

但是通过实现自定义 SERDE,您可以管理案例,

Thiyaga Rajan 提出了一个可行的实施方案

AVRO 原始类型的 Serde 类


推荐阅读