java - 轮询函数Java Kafka Consumer出错
问题描述
这是我的消费类。Alaways 我在轮询函数“KafkaConsumer<String,String> 类型中的方法 poll(long) 不适用于参数(持续时间)”和获取记录时出错“只能迭代数组或 java 实例.lang.Iterable'。我正在使用 JDK 1.8。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KaConsumer {
public static void main(String[] args) {
Logger logger= LoggerFactory.getLogger(KaConsumer.class.getName());
String bootstrapServers="127.0.0.1:9092";
String grp_id="My_App";
String topic="Tweet01";
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//creating consumer
KafkaConsumer<String,String> Consumer = new KafkaConsumer<String,String>(properties);
Consumer.subscribe("Tweet01");
while (true) {
ConsumerRecords<Integer,String> records=Consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records){
System.out.println(record);
}
}
}
}
解决方案
KafkaConsumer<String,String> 类型中的 poll(long) 方法不适用于参数 (Duration)'
该错误告诉您替换Duration.ofMillis(100)
为100
. 如果您想使用 Duration,您可以升级您的客户端版本
只能遍历数组或 java.lang.Iterable 的实例
for 循环看起来不错,ConsumerRecords
确实实现了 Iterable 接口,但是键中的类型不匹配
FWIW,如果目的只是打印记录,我建议使用 Kafka Streams 或其他更高级别的库
推荐阅读
- postgresql - Jelastic 5.4 上的 PostgreSQL 9.6 中不提供区域设置
- angularjs - 如何在java中获取列表的大小?
- excel - IF 语句中出现明显的 Excel 计算错误
- spring - 向 mongo 聚合的结果添加新字段
- javascript - 未捕获的类型错误:无法在“节点”上执行“appendChild”:参数 1 不是“节点”类型。JAVASCRIPT
- gatling - 如何检查加特林会话中是否存在变量?
- ruby-on-rails - Ruby 中 GraphQL 类型的字段定义中感叹号的目的是什么?
- windows - 使用 docker 独立于 Windows 更新
- c# - 向下转换对象不会改变对象本身?
- javascript - 带 rangeChart 的 elasticY(true) 不适应新范围