java - Kafka 消费者不返回任何记录
问题描述
我正在尝试用 Kafka 制作一个小型 PoC。但是,在 java 中创建消费者时,此消费者没有收到任何消息。即使当我使用相同的 url/topic 启动 kafka-console-consumer.sh 时,我还是会收到消息。有谁知道我可能做错了什么?此代码由 GET API 调用。
public List<KafkaTextMessage> receiveMessages() {
log.info("Retrieving messages from kafka");
val props = new Properties();
// See https://kafka.apache.org/documentation/#consumerconfigs
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
//props.put("client.id", "my-topic consumer");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ImmutableList.Builder<KafkaTextMessage> builder;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TEXT_MESSAGE_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
builder = ImmutableList.builder();
for (ConsumerRecord<String, String> record : records) {
builder.add(new KafkaTextMessage(record.value()));
log.info("We got at position: {} key:{} value: {}", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
return builder.build();
}
解决方案
尝试添加auto.offset.reset=earliest
您的消费者属性。默认值设置为latest
。我建议这样做是因为我看到您group.id
设置为test
, 您可能已经在以前的测试中使用过的值。
推荐阅读
- python - 将新列插入数据框中会给出“ValueError:值的长度 (4) 与索引的长度 (6) 不匹配”
- swift - 字体大小可调的 UITextView
- c++ - 使用 libjpeg 在 C++ 中进行 JPEG 图像旋转
- python - python从远程文本文件读取并运行命令
- kotlin - 将 Kotlin 协程与 Spring Kafka 侦听器一起使用
- r - R中的randomForst模型中的ntree是什么意思?
- c++ - 如何更改我的类私有属性值?
- html - Flexbox 标头在移动设备上变成下拉菜单
- amazon-web-services - AWS CredentialProviders 无法在 Fargate 中检索凭证
- sql - 删除表中所有小于去年当前日期的记录,但不要删除过去 2 年给定月份的结束日期