java - 带有模式注册表的融合 kafka 上的 Json 消息验证
问题描述
我正在使用带有 java 的 spring boot kafka 向 kafka 主题发送一条 json 消息。该消息具有在模式注册表上发布的 json 模式。
当生产者发送消息时,如何让 json 消息根据其模式进行验证?我可以看到 json 消息出现在主题上,并在模式注册表上看到模式。
生产者配置包括:
@Bean
public ProducerFactory<String, TestUser> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
configProps.put("schema.registry.url", "http://localhost:8081");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, TestUser> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
测试用户:
@NoArgsConstructor
@Data
public class TestUser {
@JsonProperty
public String firstName;
@JsonProperty(required = true)
public String lastName;
}
消息发送者:
public void sendMessage(TestUser user) {
ProducerRecord<String, TestUser> producerRecord = new ProducerRecord<>(fcsTopic,user.getFirstName(),user);
final ListenableFuture<SendResult<String, TestUser>> resultListenableFuture =
kafkaTemplate.send(producerRecord);
resultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, TestUser>>() {
@Override
public void onSuccess(SendResult<String, TestUser> result) {
System.out.println("Sent message=[" + user +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ user + "] due to : " + ex.getMessage());
}
});
}
架构注册表:
{
"subject": "test.user.topic-value",
"version": 1,
"id": 1,
"schemaType": "JSON",
"schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Test User\",\"type\":\"object\",\"additionalProperties\":false,\"properties\":{\"firstName\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"lastName\":{\"type\":\"string\"}},\"required\":[\"lastName\"]}
}
该主题有 2 个验证配置可用于对消息应用验证:
我注意到当没有应用验证时,我可以发送一条消息并毫无问题地使用一条消息。消费者具有适当的反序列化器。
仅启用密钥验证时,confluent.key.schema.validation: true,当我发送消息时,我在代理日志中看到错误:
INFO Invalid record due to missing magic byte (io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator)
ERROR [ReplicaManager broker=1] Error processing append operation on partition dfm.fcs.csv.topic-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
- 当仅启用值验证时,confluent.value.schema.validation:我注意到如果 json 消息所需字段之一具有空值,它不会失败验证。
可以通过添加到 Producer 配置来修复和启用此值验证:
configProps.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, "true");
是否有任何其他配置可以正确启用 json 键和值消息验证?
谢谢,B。
解决方案
推荐阅读
- reactjs - 使用反应故事书的复杂布局
- macos - 使 k8s 集群服务可用于本地 docker 容器
- git - Bitbake 命令因发行版 fsl-imx-xwayland 而失败
- docker - docker 注册表前端(konradkleine/docker-registry-frontend)在单击存储库时出错,并且看不到 docker 图像
- r - 在 dbscan 结果中查找 k 最大的集群
- c++ - 尝试访问结构类型时出现 std::bad_variant_access 异常
- cytoscape - cytoscape 系统检查器在 mac catalina 上失败
- mysql - 如何在 SQL 中查找日期之间的总行数?
- arrays - F# 中的 Array.replicate 和 Array.create 之间有什么功能区别吗?
- javafx - 缩放后模糊的PDF