apache-kafka - MockSchemaRegistryClient 未注册 avro 架构:无法从架构注册表中获取架构
问题描述
我正在使用 spring-kafka-test 2.6.3EmbeddedKafka
和 Junit 5 为使用 avro 消息的拓扑编写 Spring Boot 集成测试。在我正在使用的测试中MockSchemaReigstryClient
我正在注册模拟模式客户端并按照此PR中的建议配置主题,该 PR 现在已关闭。但我收到以下错误:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 1
Caused by: java.io.IOException: Cannot get schema from schema registry!
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:176) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:215) ~[kafka-schema-registry-client-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:279) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:98) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:77) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66) ~[kafka-streams-avro-serde-6.0.1.jar:na]
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38) ~[kafka-streams-avro-serde-6.0.1.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:865) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:938) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:640) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) ~[kafka-streams-2.6.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) ~[kafka-streams-2.6.0.jar:na]
生产代码工作正常。所以,似乎我在测试设置中遗漏了一些东西。任何指针将不胜感激。这是代码的要点。
更新:我正在使用 kafka-schema-registry-client-6.0.1 maven 依赖项。
解决方案
您的 StreamsConfig 仍然使用默认值SpecificAvroSerde
(是的),MockSchemRegistryClient
因为您配置了,schemaRegistryUrl : mock://
但它没有 serde 所需的模式。这MockSchemRegistryClient
是与您在测试中为生产者和消费者配置的实例不同的实例。
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.class.getName());
考虑这个流程:(1) 测试生产者 --> (2) IN_TOPIC -> (3) StreamApp -> (4) OUT_TOPIC -> (5) 测试消费者。
您已经MockSchemaRegistryClient
在 (1) 处为生产者配置,在 (5) 处为消费者配置,甚至注册了所需的模式,但在 (3) 处还没有为一个(主应用程序)配置。因此错误:
Caused by: java.io.IOException: Cannot get schema from schema registry!
解决此问题的一种方法是使用MockSpecificAvroSerde
指向具有测试模式的注册表的 a :
application.yaml
:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
specific.avro.reader: true
default:
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
但是对于application-test.yaml
:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
schema.registry.url: mock://localtest
specific.avro.reader: true
default:
value.serde: your.custom.serde.that.uses.a.mock.schema.regiter.client.bydefault.MockSpecificAvroSerde
推荐阅读
- sqlplus - 所以我试图从一个表中过滤重复项,以便它只显示一个版本的数据组合
- python - Numpy Python中的多维索引错误
- sql - 在单个 SQL 查询中更新多个列
- css - 我可以在背景中定位和调整线性渐变吗?
- html - 让背景变色按钮工作
- python - How to create forecasted values chart using openpyxl?
- intellij-idea - 从 grails 2.0.8 升级时,将源 java 文件放在 grails 4.0.8 的哪里?
- odoo-14 - 如何从 JS 在 Odoo 中渲染猫头鹰模板?
- python - 无法在 python 中以正常格式转换此文本?
- javascript - 如何根据用户选择下拉值创建动态字段