java - Kafka 生产者如何找到记录的模式 ID
问题描述
在 Kafka 生产者中,我可以看到我们只需要指定模式注册表 url 而不是我想要的模式。因此,在记录序列化时,生产者如何决定使用哪个模式。因为模式注册表可以托管多个模式。
https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry
请参阅上面网址中给出的以下示例。在这里,我看不到架构 ID,而只有注册表 url。那么生产者如何找到正确的模式?
公共类 AvroProducer {
private static Producer<Long, Employee> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
// Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());
// Schema Registry location.
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
return new KafkaProducer<>(props);
}
private final static String TOPIC = "new-employees";
public static void main(String... args) {
Producer<Long, Employee> producer = createProducer();
Employee bob = Employee.newBuilder().setAge(35)
.setFirstName("Bob")
.setLastName("Jones")
.setPhoneNumber(
PhoneNumber.newBuilder()
.setAreaCode("301")
.setCountryCode("1")
.setPrefix("555")
.setNumber("1234")
.build())
.build();
IntStream.range(1, 100).forEach(index->{
producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));
});
producer.flush();
producer.close();
}
}
解决方案
您发送的 POJO 类被序列化以包含模式,该模式在二进制数据有效负载发送到 Kafka 之前以纯文本形式发送到注册表。
当在服务器端接收到注册表 HTTP 请求时,它使用 MD5 哈希对一些内部哈希映射来检查/插入模式文本哈希以及主题/Avro 记录(注册表“主题”)名称,然后返回数字 ID到 HTTP 响应中的序列化程序以创建 Kafka 消息有效负载。
如果您想了解更多信息,Schema Registry 代码是开源的
推荐阅读
- python - Pandas 处理大型 CSV 数据
- ios - 不支持将桥接头与模块接口一起使用 Command CompileSwiftSources failed with a nonzero exit code
- linux - 在 Linux 中找出两个目录之间丢失的文件(缺少文件名,但不是扩展名)
- ibm-cloud - 自然语言分类器教程 - 发生 500 错误
- python - 尝试利用库进行一些主题建模,但进展不顺利
- c# - 将数组一分为二,以找到获得相等或几乎相等的整数和的最佳解决方案
- python - 值在数据库上时注册帐户(窗口)拒绝(MySQL)
- java - 如何在android中使用圆角矩形自定义图像视图
- azure-devops - 保留某些版本并无限期发布
- javascript - 如何使用javascript在d3js中更改强制有向图中节点的颜色?