首页 > 解决方案 > Java Kafka Consumer在多线程中的使用

问题描述

我正在考虑在线程池中使用 Kafka Consumer。我提出了这种方法。现在它似乎工作正常,但我正在考虑缺点以及这种方法会带来什么问题。基本上我需要的是将记录处理与消费分离。此外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交。有人可以就如何更好地做到这一点提出建议或建议吗?

  final var consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(topics);
    final var threadPool = Executors.newFixedThreadPool(32);

    while(true) {

        ConsumerRecords<String, String> records;

        synchronized (consumer) {
            records = consumer.poll(Duration.ofMillis(100));
        }

        CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
            synchronized (consumer) {
                consumer.commitSync();
            }
        });
    }

标签: javaapache-kafkakafka-consumer-api

解决方案


问题

此解决方案对于所述要求并不稳健:

另外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交

设想:

  1. 轮询读取 100 条记录,开始异步处理
  2. 轮询读取 5 条记录,开始异步处理
  3. 立即处理 5 条记录,并在处理 100 条记录时完成消费者提交
  4. 消费者崩溃

当消费者再次启动时,最后一次提交将对应于第 105 条记录。因此它将开始处理第 106 条记录,我们错过了成功处理 1-100 条记录。

您只需通过以下方式提交您在该轮询中处理的偏移量:

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

此外,需要保证顺序,以便首先提交第一个轮询,然后是第二个,依此类推。这将是相当复杂的。

主张

我相信您正在尝试在消息处理中实现并发。这可以通过更简单的解决方案来实现。增加你的max.poll.records以读取一个像样的批次,将其分成更小的批次并异步运行它们以实现并发。完成所有批次后,提交给 kafka 消费者。


推荐阅读