首页 > 解决方案 > 带有模式注册表的融合 kafka 上的 Json 消息验证

问题描述

我正在使用带有 java 的 spring boot kafka 向 kafka 主题发送一条 json 消息。该消息具有在模式注册表上发布的 json 模式。

当生产者发送消息时,如何让 json 消息根据其模式进行验证?我可以看到 json 消息出现在主题上,并在模式注册表上看到模式。

生产者配置包括:

   @Bean
    public ProducerFactory<String, TestUser> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);

        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
        configProps.put("schema.registry.url", "http://localhost:8081");
       
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, TestUser> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

测试用户:

@NoArgsConstructor
@Data
public class TestUser {
    @JsonProperty
    public String firstName;

    @JsonProperty(required = true)
    public String lastName;
}

消息发送者:

public void sendMessage(TestUser user) {

        ProducerRecord<String, TestUser> producerRecord = new ProducerRecord<>(fcsTopic,user.getFirstName(),user);
        final ListenableFuture<SendResult<String, TestUser>> resultListenableFuture =
                kafkaTemplate.send(producerRecord);

        resultListenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, TestUser>>() {
            @Override
            public void onSuccess(SendResult<String, TestUser> result) {
                System.out.println("Sent message=[" + user +
                        "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=["
                        + user + "] due to : " + ex.getMessage());
            }
        });
    }

架构注册表:

{
        "subject": "test.user.topic-value",
        "version": 1,
        "id": 1,
        "schemaType": "JSON",
        "schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Test User\",\"type\":\"object\",\"additionalProperties\":false,\"properties\":{\"firstName\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"lastName\":{\"type\":\"string\"}},\"required\":[\"lastName\"]}
 }

该主题有 2 个验证配置可用于对消息应用验证:

  • confluent.key.schema.validation
  • confluent.value.schema.validation

    1. 我注意到当没有应用验证时,我可以发送一条消息并毫无问题地使用一条消息。消费者具有适当的反序列化器。

    2. 仅启用密钥验证时,confluent.key.schema.validation: true,当我发送消息时,我在代理日志中看到错误:

    INFO Invalid record due to missing magic byte (io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator)
    ERROR [ReplicaManager broker=1] Error processing append operation on partition dfm.fcs.csv.topic-0 (kafka.server.ReplicaManager)
    org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
    
    1. 当仅启用值验证时,confluent.value.schema.validation:我注意到如果 json 消息所需字段之一具有值,它不会失败验证。

    可以通过添加到 Producer 配置来修复和启用此值验证:

     configProps.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, "true");
    

    是否有任何其他配置可以正确启用 json 键和值消息验证?

    谢谢,B。

    标签: javaapache-kafkajsonschemaconfluent-platformconfluent-schema-registry

    解决方案


    推荐阅读