首页 > 解决方案 > Kafka reactor - 如何禁用自动启动的 KAFKA 消费者?

问题描述

下面是我的 KAFKA 消费者

@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
        matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
    ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulerSupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
    receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
            .subscription(Collections.singleton(
                    kafkaProperties.getKafka()
                            .getCore().getInbound().getConfluent()
                            .getTopicName()))
            .commitInterval(Duration.ZERO).commitBatchSize(0));
}

我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。

我知道了,在春天的 KAFKA 我们可以做这样的事情

factory.setAutoStartup(start);

但是,我不确定如何在 Kafka 反应器中实现(控制自动启动/停止行为)。我想要下面的东西

引入一个属性来处理自动启动/停止行为

@Value("${consumer.autostart:true}")
private boolean start;

使用上述属性,我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,就像这样

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
        .subscription(Collections.singleton(
                kafkaProperties.getKafka()
                        .getCore().getInbound().getConfluent()
                        .getTopicName()))
        .commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);

注意:.setAutoStart(start);

这在 Kafka 反应器中是否可行,如果可以,我该怎么做?

更新:

protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
    kafkaEventHubInboundReceiver
            .receive()
            .publishOn(scheduler)
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby", throwable);
                    return Flux.empty();
                }
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record, topicName, allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",
                         message.offset(), message.partition());
                return message.receiverOffset()
                        .commit()
                        .onErrorContinue((t, o) -> log.error(
                                String.format("exception raised while commit offset %s", o), t)
                        );
            })).onErrorContinue((t, o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("failed to process message: {} partition: {} and message: {} ",
                          offset.offset(), record.partition(), record.value());
            }
            log.error(String.format("exception raised while processing message %s", o), t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue", inner);
        }
    }).subscribeOn(scheduler).subscribe();

我可以做这样的事情吗?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}

标签: apache-kafkaspring-webfluxproject-reactorreactorreactor-kafka

解决方案


With reactor-kafka there is no concept of "auto start"; you are in complete control.

The consumer is not "started" until you subscribe to the Flux returned from receiver.receive().

Simply delay the flux.subscribe() until you are ready to consume data.


推荐阅读