java - 从java中的主题读取数据时没有收到avro消息
问题描述
我第一次在 java 中编写代码来使用来自 kafka 主题的 AVRO 数据。我正在使用 kafka-avro-console-producer 来制作记录。我在 Docker 上使用 leneseio/fast-data-dev 映像来升级 kafka 堆栈。
生产记录:
root@fast-data-dev / $ kafka-avro-console-producer --broker-list localhost:9092 --topic payengine --property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"record", "name":"payengine", "fields":[{"name":"tin", "type":"string"},{"name":"ach","type":"string"}] }'
{"tin":"61582","ach":"I"}
{"tin":"97820","ach":"I"}
现在,为了阅读这条记录,我写了下面的代码。此外,似乎我不必在使用记录时引用架构(如下面的参考链接中所述)。我还经历了一个示例,其中使用 SpecificAvroRecord 代替 GenericRecord,但这需要基于模式构建类。我不确定 GenericRecord 如何指向正确的架构,但在示例中看不到任何架构引用。
package com.github.psingh.Kafka;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer_AvroSchema {
public static void main(String[] args) {
// System.out.println("Hello Kafka ");
// setting properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
//name topic
String topic = "payengine";
// create the consumer
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
//subscribe to topic
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for the data...");
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(5000));
for(ConsumerRecord<String,GenericRecord> record: records) {
System.out.print(record.value());
}
// consumer.commitSync();
}
}
}
构建的代码是成功的。我希望在这里获得控制台生成的记录,但我什么也没得到:
请建议。
我从这里参考:
https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html
解决方案
推荐阅读
- r - R:无法读取闪亮的对象错误:不允许从闪亮的输出对象读取对象
- javascript - 当我尝试从 mqtt 频道获取纬度和经度值时,我得到 NaN 值?我尝试将它们解析为浮动但仍以 NaN 值结束
- mariadb - 在 grafana 中使用分隔符(Mariadb)
- c# - 如何调用API并行处理而不是一一传递URI
- java - 从 Java 代码连接到 Azure Cosmos DB 时出现 UnknownHostException
- swift - 如何在 Swift 5 中以编程方式更改导航栏的背景颜色?
- android - 即使在更改数据库规则以允许所有读取和写入后,也无法从 firestore 获取数据
- postgresql - 如果列类型是使用 JPA 的 JSONB,我如何将 JSON 字符串存储到 PostgreSQL 中?
- python - 使用 pandas 从数据框中过滤掉数据
- android - 图像编码为 Base64 后左旋转 90 度