apache-kafka - 偏移量为 0 的 Kafka 消费者轮询未返回消息
问题描述
我正在使用 spring-kafka 来轮询消息,当我为消费者使用注释并将偏移量设置为 0 时,它将最早看到所有消息。但是当我尝试使用注入的 ConsumerFactory 自己创建消费者时,poll 只会返回几条消息或根本没有消息。为了能够提取消息,我还需要其他一些配置吗?轮询超时已设置为 10 秒。
@Component
public class GenericConsumer {
private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);
@Autowired
ConsumerFactory<String, Record> consumerFactory;
public ConsumerRecords<String, Record> poll(String topic, String group){
logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
consumer.subscribe(Arrays.asList(topic));
// need to make a dummy poll before we can seek
consumer.poll(1000);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, Record> records;
records = consumer.poll(10000);
logger.info("------------ Total " + records.count() + " records polled");
consumer.close();
return records;
}
}
解决方案
它对我来说很好,这是 boot 2.0.5,Spring Kafka 2.1.10 ...
@SpringBootApplication
public class So52284259Application implements ConsumerAwareRebalanceListener {
private static final Logger logger = LoggerFactory.getLogger(So52284259Application.class);
public static void main(String[] args) {
SpringApplication.run(So52284259Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, GenericConsumer consumer) {
return args -> {
// for (int i = 0; i < 1000; i++) { // load up the topic on first run
// template.send("so52284259", "foo" + i);
// }
consumer.poll("so52284259", "generic");
};
}
@KafkaListener(id = "listener", topics = "so52284259")
public void listen(String in) {
if ("foo999".equals(in)) {
logger.info("@KafkaListener: " + in);
}
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
@Bean
public NewTopic topic() {
return new NewTopic("so52284259", 1, (short) 1);
}
}
@Component
class GenericConsumer {
private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);
@Autowired
ConsumerFactory<String, String> consumerFactory;
public void poll(String topic, String group) {
logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
Consumer<String, String> consumer = consumerFactory.createConsumer(group, "");
consumer.subscribe(Arrays.asList(topic));
// need to make a dummy poll before we can seek
consumer.poll(1000);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, String> records;
boolean done = false;
while (!done) {
records = consumer.poll(10000);
logger.info("------------ Total " + records.count() + " records polled");
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
String value = iterator.next().value();
if ("foo999".equals(value)) {
logger.info("Consumer: " + value);
done = true;
}
}
}
consumer.close();
}
}
和
2018-09-12 09:35:25.929 INFO 61390 --- [ main] com.example.GenericConsumer : ------------ Total 500 records polled
2018-09-12 09:35:25.931 INFO 61390 --- [ main] com.example.GenericConsumer : ------------ Total 500 records polled
2018-09-12 09:35:25.932 INFO 61390 --- [ main] com.example.GenericConsumer : Consumer: foo999
2018-09-12 09:35:25.942 INFO 61390 --- [ listener-0-C-1] com.example.So52284259Application : @KafkaListener: foo999
推荐阅读
- reactjs - “ReactJs”每个产品的图片库
- jquery - 在圆内居中加/减切换元素
- javascript - 导航栏中的 Bootstrap 下拉菜单在没有数据的内容页面上工作,但如果我添加一些数据,它就不再起作用了
- vba - 在使用 InsertFile 方法插入文件之前添加额外空间的问题
- ios - 测试此 String init 方法是否将字符串更改为不同的语言?
- java - JAXB 解组 JSON HTTP POST 参数
- google-sheets - 使用查询中的单元格引用获取结果总和的 Google 查询语言
- android - 使用布局作为另一个布局的背景
- amazon-web-services - Amazon Elastic IP 的定价和问题
- php - 如何从另一个表中获取总数