首页 > 解决方案 > Kafka 生产者如何找到记录的模式 ID

问题描述

在 Kafka 生产者中,我可以看到我们只需要指定模式注册表 url 而不是我想要的模式。因此,在记录序列化时,生产者如何决定使用哪个模式。因为模式注册表可以托管多个模式。

https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

请参阅上面网址中给出的以下示例。在这里,我看不到架构 ID,而只有注册表 url。那么生产者如何找到正确的模式?

公共类 AvroProducer {

private static Producer<Long, Employee> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());

    // Configure the KafkaAvroSerializer.
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaAvroSerializer.class.getName());

    // Schema Registry location.
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8081");

    return new KafkaProducer<>(props);
}

private final static String TOPIC = "new-employees";

public static void main(String... args) {

    Producer<Long, Employee> producer = createProducer();

    Employee bob = Employee.newBuilder().setAge(35)
            .setFirstName("Bob")
            .setLastName("Jones")
            .setPhoneNumber(
                    PhoneNumber.newBuilder()
                            .setAreaCode("301")
                            .setCountryCode("1")
                            .setPrefix("555")
                            .setNumber("1234")
                            .build())
            .build();

    IntStream.range(1, 100).forEach(index->{
        producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));

    });

    producer.flush();
    producer.close();
}

}

标签: javaapache-kafkaavroconfluent-schema-registry

解决方案


您发送的 POJO 类被序列化以包含模式,该模式在二进制数据有效负载发送到 Kafka 之前以纯文本形式发送到注册表。

当在服务器端接收到注册表 HTTP 请求时,它使用 MD5 哈希对一些内部哈希映射来检查/插入模式文本哈希以及主题/Avro 记录(注册表“主题”)名称,然后返回数字 ID到 HTTP 响应中的序列化程序以创建 Kafka 消息有效负载。

如果您想了解更多信息,Schema Registry 代码是开源的


推荐阅读