首页 > 解决方案 > 当所有分区都暂停时如何停止容器?

问题描述

我有一个用例,我正在创建基于 API 调用的动态侦听器。收到某条消息后,我正在单独暂停分区。例如,如果我收到一条偏移量为 100 的消息,那么该分区将被暂停。这将对所有分区进行。暂停所有分区后,我想停止容器。

这是我的代码 -

public class OffsetBasedMessageListener implements ConsumerAwareMessageListener<String, String> {

  @Override
  public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
    if (consumerRecord.offset() == 100) {
    consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(),
    consumerRecord.partition())));
  }
}

现在,我配置了一个 EventListener,它将在容器空闲时触发。

  @EventListener
  public void onIdle(ListenerContainerIdleEvent event) {
    Collection<TopicPartition> collection = event.getContainer(ConcurrentMessageListenerContainer.class).getAssignedPartitions();
    for (TopicPartition topicPartition: collection) {
      System.out.println(

event.getContainer(ConcurrentMessageListenerContainer.class).isPartitionPaused(topicPartition));
        }
      }

所以,我现在正在做的是检查是否所有分区都已暂停,然后我将停止容器。但isPartitionPaused总是返回 false,即使它应该返回 true。

我正在将 SpringBoot 与 SpringKafka 一起使用。

有人可以告诉我我做错了什么吗?还是有另一种方法来实现这一目标?

谢谢

标签: javaspringspring-bootspring-kafka

解决方案


你不应该直接暂停消费者——容器不知道你已经暂停了它;因此它返回false。

而是调用pausePartition容器。

请注意,无论哪种方式,在处理完之前轮询的所有记录之前,消费者都不会真正暂停。设置max.poll.records为 1 以立即暂停。

编辑

这对我来说很好......

@SpringBootApplication
@RestController
public class So67430862Application {


    private static final Logger LOG = LoggerFactory.getLogger(So67430862Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So67430862Application.class, args);
    }

    Set<ConcurrentMessageListenerContainer<String, String>> containers = ConcurrentHashMap.newKeySet();

    int id;

    @Autowired
    Creator creator;

    @PostMapping(path = "/send/{topic}")
    public void sendFoo(@PathVariable String topic) {
        ConcurrentMessageListenerContainer<String, String> container = this.creator.create(topic);
        container.getContainerProperties().setGroupId("group" + ++id);
        container.getContainerProperties().setMessageListener(new Listener(container));
        this.containers.add(container);
        container.start();
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        ConcurrentMessageListenerContainer<String, String> container =
                event.getContainer(ConcurrentMessageListenerContainer.class);
        if (this.containers.contains(container)) {
            boolean allPaused = container.getAssignedPartitions()
                    .stream()
                    .map(part -> container.isPartitionPaused(part))
                    .allMatch(paused -> paused);
            LOG.info("All paused? {}", allPaused);
            if (allPaused) {
                container.stop(() -> { });
                this.containers.remove(container);
            }
        }
    }

//  @Bean
//  public NewTopic topic() {
//      return TopicBuilder.name("so67430862").partitions(5).replicas(1).build();
//  }
//
//
//  @Bean
//  public ApplicationRunner runner(KafkaTemplate<String, String> template) {
//      return args -> {
//          IntStream.range(0, 5).forEach(p -> {
//              IntStream.range(0, 10).forEach(i ->template.send("so67430862", p, null, "test"));
//          });
//      };
//  }

}

@Component
class Creator {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    Creator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    ConcurrentMessageListenerContainer<String, String> create(String topic) {
        return this.factory.createContainer(topic);
    }

}

class Listener extends AbstractConsumerSeekAware implements MessageListener<String, String> {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    final ConcurrentMessageListenerContainer<String, String> container;

    Listener(ConcurrentMessageListenerContainer<String, String> container) {
        this.container = container;
    }

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        LOG.info(ListenerUtils.recordToString(data, true));
        if (data.offset() == 5) {
            LOG.info("Pausing partition {}", data.partition());
            this.container.pausePartition(new TopicPartition(data.topic(), data.partition()));
        }
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}
spring.kafka.consumer.max-poll-records=1
spring.kafka.listener.idle-event-interval=5000
2021-05-10 11:51:15.792  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@0
2021-05-10 11:51:15.797  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@1
2021-05-10 11:51:15.799  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@2
2021-05-10 11:51:15.801  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@3
2021-05-10 11:51:15.803  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@4
2021-05-10 11:51:15.809  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-0@5
2021-05-10 11:51:15.809  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : Pausing partition 0
2021-05-10 11:51:15.814  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@0
2021-05-10 11:51:15.816  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@1
2021-05-10 11:51:15.819  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@2
2021-05-10 11:51:15.823  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@3
2021-05-10 11:51:15.828  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@4
2021-05-10 11:51:15.831  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-1@5
2021-05-10 11:51:15.831  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : Pausing partition 1
2021-05-10 11:51:15.835  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@0
2021-05-10 11:51:15.838  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@1
2021-05-10 11:51:15.841  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@2
2021-05-10 11:51:15.844  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@3
2021-05-10 11:51:15.846  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@4
2021-05-10 11:51:15.849  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-4@5
2021-05-10 11:51:15.850  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : Pausing partition 4
2021-05-10 11:51:15.854  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@0
2021-05-10 11:51:15.858  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@1
2021-05-10 11:51:15.861  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@2
2021-05-10 11:51:15.864  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@3
2021-05-10 11:51:15.866  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@4
2021-05-10 11:51:15.871  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-2@5
2021-05-10 11:51:15.871  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : Pausing partition 2
2021-05-10 11:51:15.875  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@0
2021-05-10 11:51:15.877  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@1
2021-05-10 11:51:15.880  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@2
2021-05-10 11:51:15.883  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@3
2021-05-10 11:51:15.886  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@4
2021-05-10 11:51:15.889  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : so67430862-3@5
2021-05-10 11:51:15.889  INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener                : Pausing partition 3
2021-05-10 11:51:20.893  INFO 58682 --- [ consumer-0-C-1] com.example.demo.So67430862Application   : All paused? true
2021-05-10 11:51:20.895  INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3
2021-05-10 11:51:20.895  INFO 58682 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group1: partitions revoked: [so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3]
2021-05-10 11:51:20.895  INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-be137c2c-2bbf-4235-a143-15d33a0cfe52 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
2021-05-10 11:51:20.896  INFO 58682 --- [ consumer-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group1-1, groupId=group1] Unsubscribed all topics or patterns and assigned partitions
2021-05-10 11:51:20.896  INFO 58682 --- [ consumer-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-05-10 11:51:20.897  INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2021-05-10 11:51:20.897  INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-05-10 11:51:20.898  INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2021-05-10 11:51:20.901  INFO 58682 --- [ consumer-0-C-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for consumer-group1-1 unregistered
2021-05-10 11:51:20.902  INFO 58682 --- [ consumer-0-C-1] essageListenerContainer$ListenerConsumer : group1: Consumer stopped

注释掉的代码创建了一个包含 5 个分区和每个分区 10 条记录的主题;监听器在偏移量 5 处暂停每个分区。如您所见,我们没有得到其余的记录。


推荐阅读