首页 > 解决方案 > 从java中的主题读取数据时没有收到avro消息

问题描述

我第一次在 java 中编写代码来使用来自 kafka 主题的 AVRO 数据。我正在使用 kafka-avro-console-producer 来制作记录。我在 Docker 上使用 leneseio/fast-data-dev 映像来升级 kafka 堆栈。

生产记录:

root@fast-data-dev / $ kafka-avro-console-producer --broker-list localhost:9092 --topic payengine --property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"record", "name":"payengine", "fields":[{"name":"tin", "type":"string"},{"name":"ach","type":"string"}] }'
{"tin":"61582","ach":"I"}
{"tin":"97820","ach":"I"}

现在,为了阅读这条记录,我写了下面的代码。此外,似乎我不必在使用记录时引用架构(如下面的参考链接中所述)。我还经历了一个示例,其中使用 SpecificAvroRecord 代替 GenericRecord,但这需要基于模式构建类。我不确定 GenericRecord 如何指向正确的架构,但在示例中看不到任何架构引用。

package com.github.psingh.Kafka;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer_AvroSchema {

    public static void main(String[] args) {
        // System.out.println("Hello Kafka ");

        // setting properties
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        //name topic
        String topic = "payengine";

        // create the consumer
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);

        //subscribe to topic
        consumer.subscribe(Collections.singleton(topic));

        System.out.println("Waiting for the data...");

        while (true) {
            ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord<String,GenericRecord> record: records) {

                System.out.print(record.value());

            }

           // consumer.commitSync();
        }

    }
}

构建的代码是成功的。我希望在这里获得控制台生成的记录,但我什么也没得到:

在此处输入图像描述

请建议。

我从这里参考:

https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html

标签: javaconfluent-schema-registryconfluent-platform

解决方案


推荐阅读