java - 如何从kafka avro记录生成pojo?
问题描述
我有一个 User 类,我将它序列化为 avro,(使用 Confluent avro 序列化程序和模式注册表)并将其发布到 Kafka 主题。我让消费者将数据打印到控制台,它工作正常。我现在尝试的是从这些数据中创建原始对象。例如,我将“用户”对象作为 avro 发布到 Kafka 主题。我正在尝试在使用后重新创建该用户对象(而不是控制台输出)。这可能吗?
下面是我的代码
用户类
public class User {
int id;
String name;
public User(){}
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
消费者代码
User user = new User();
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "avro-consumer-group");
props.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("schema.registry.url","http://127.0.0.1:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList("avrotesttopic"));
System.out.println("Subscribed to topic " + "avrotesttopic");
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (org.apache.kafka.clients.consumer.ConsumerRecord<String, GenericRecord> record : records){
System.out.printf("value = %sn",record.value());
//output-> value = {"id": 10, "name": "testName"}
}
}
谢谢
解决方案
考虑到您正在使用 KafkaAvroDeserializer,您需要将以下属性设置为您的使用者配置的一部分
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
它将允许您获得SpecificRecord
而不是GenericRecord
已经由消费者处理。这是一个例子
https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry
推荐阅读
- asp.net - 如何为同一页面实现两个不同的会话?
- php - 从php中具有相同名称的按钮获取ID
- laravel - Laravel 密码重置链接指向旧的 APP_URL
- javascript - WordPress 中的 Ajax 调用返回整个 HTML 页面作为响应
- google-chrome-devtools - Chrome 无法仅解析 HTTPS 的主机名故障排除
- r - 对一组行进行分组以找到平均值,然后使用 R 中的数据框将平均值重新分配给每一行
- android - 如何在 LazyColumn 列文本中的两个不同文本中交替颜色
- python-3.x - 为什么我的函数无限循环?(Python 3.x)[已解决]
- c - 在使用 fork 将某些内容写入程序中的管道之前,读取似乎不会阻塞
- javascript - jQuery:搜索名称没有空格