首页 > 解决方案 > Kafka Consumer API 跳跃偏移量

问题描述

我正在使用 Kafka 2.0 版和 java 消费者 API 来使用来自主题的消息。我们使用单节点 Kafka 服务器,每个分区有一个消费者。我观察到消费者正在丢失一些消息。场景是:消费者轮询主题。我为每个线程创建了一个消费者。获取消息并将其提供给处理程序以处理消息。然后它使用“至少一次”Kafka Consumer 语义来提交偏移量以提交 Kafka 偏移量。同时,我有另一个使用不同组 ID 运行的消费者。在这个消费者中,我只是增加消息计数器并提交偏移量。这个消费者没有消息丢失。

try {
    //kafkaConsumer.registerTopic();

    consumerThread = new Thread(() -> {
        final String topicName1 = "topic-0";
        final String topicName2 = "topic-1";
        final String topicName3 = "topic-2";
        final String topicName4 = "topic-3";

        String groupId = "group-0";
        final Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.49:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

        try {
            consumer = new KafkaConsumer<>(consumerProperties);
            consumer.subscribe(Arrays.asList(topicName1, topicName2, topicName3, topicName4));
        } catch (KafkaException ke) {
            logTrace(MODULE, ke);
        }
        while (service.isServiceStateRunning()) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, byte[]>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, byte[]> record : partitionRecords) {
                    processMessage(simpleMessage);

                }
            }
            consumer.commitSync();
        }
        kafkaConsumer.closeResource();
    }, "KAKFA_CONSUMER");

} catch (Exception e) {
}

标签: apache-kafka

解决方案


在这里使用 subscribe() 似乎有问题。

订阅用于订阅主题而不是分区。要使用特定分区,您需要使用 assign()。阅读文档中的摘录:

公共无效订阅(java.util.Collection 主题)

订阅给定的主题列表以获取动态分配的分区。主题订阅不是增量的。此列表将替换当前分配(如果有的话)。无法通过 assign(Collection) 将主题订阅与组管理与手动分区分配相结合。如果给定的主题列表为空,则将其视为与 unsubscribe() 相同。这是 subscribe(Collection, ConsumerRebalanceListener) 的简写,它使用 noop 侦听器。如果您需要寻找特定偏移量的能力,您应该更喜欢 subscribe(Collection, ConsumerRebalanceListener),因为组重新平衡会导致分区偏移量被重置。


公共无效分配(java.util.Collection 分区)

手动将分区列表分配给此使用者。该接口不允许增量赋值,将替换之前的赋值(如果有的话)。如果给定的主题分区列表为空,则将其视为与 unsubscribe() 相同。通过这种方法手动分配主题不使用消费者的组管理功能。因此,当组成员或集群和主题元数据发生变化时,不会触发重新平衡操作。请注意,不能将手动分区分配与 assign(Collection) 和组分配与 subscribe(Collection, ConsumerRebalanceListener) 一起使用。


推荐阅读