首页 > 解决方案 > 轮询函数Java Kafka Consumer出错

问题描述

这是我的消费类。Alaways 我在轮询函数“KafkaConsumer<String,String> 类型中的方法 poll(long) 不适用于参数(持续时间)”和获取记录时出错“只能迭代数组或 java 实例.lang.Iterable'。我正在使用 JDK 1.8。

import org.apache.kafka.clients.consumer.ConsumerConfig;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.common.serialization.StringDeserializer;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
  
import java.time.Duration;  
import java.util.Arrays;  
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; 
public class KaConsumer {

    public static void main(String[] args) { 
        Logger logger= LoggerFactory.getLogger(KaConsumer.class.getName());  
        String bootstrapServers="127.0.0.1:9092";  
        String grp_id="My_App";  
        String topic="Tweet01";
        Properties properties=new Properties();  
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);  
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());  
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());  
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);  
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

      //creating consumer  
        KafkaConsumer<String,String> Consumer = new KafkaConsumer<String,String>(properties);
        Consumer.subscribe("Tweet01");

        
        while (true) {
            ConsumerRecords<Integer,String> records=Consumer.poll(Duration.ofMillis(100));  
            for(ConsumerRecord<String,String> record: records){  
            System.out.println(record);
            }
        }
      }
}

标签: javaapache-kafkakafka-consumer-api

解决方案


KafkaConsumer<String,String> 类型中的 poll(long) 方法不适用于参数 (Duration)'

该错误告诉您替换Duration.ofMillis(100)100. 如果您想使用 Duration,您可以升级您的客户端版本

只能遍历数组或 java.lang.Iterable 的实例

for 循环看起来不错,ConsumerRecords确实实现了 Iterable 接口,但是键中的类型不匹配

FWIW,如果目的只是打印记录,我建议使用 Kafka Streams 或其他更高级别的库


推荐阅读