首页 > 解决方案 > Java Kafka消费者不会从它离开的地方恢复

问题描述

我有生产者线程将数据推送到 Kafka 主题中,一个消费者线程一次轮询每个项目。这是消费者配置:

    private final KafkaConsumer<Long, Integer> consumer;

    public static final String TOPIC = "requestsss";

    {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

    }

消费者循环运行以下代码:

  ConsumerRecords<Long, Integer> consumerRecords = consumer.poll(Duration.ofMillis(10000));
  consumer.commitSync();
  return consumerRecords.records(TOPIC).iterator().next().value();

然后消费者进程在没有消费所有产生的消息的情况下停止。问题是当它重新启动时,它不会从它离开的地方恢复进程。相反,它就好像所有消息都在前一次运行中被消耗掉了。

有谁知道我做错了什么?

注意:我也尝试过设置

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

既是真又是假,也去除了consumer.commitSync(),但都没有产生预期的效果。

谢谢您的帮助

标签: javaapache-kafka

解决方案


推荐阅读