apache-kafka - Kafka reactor - 如何禁用自动启动的 KAFKA 消费者?
问题描述
下面是我的 KAFKA 消费者
@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
receiverOptions.schedulerSupplier(() -> Schedulers
.fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0));
}
我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。
我知道了,在春天的 KAFKA 我们可以做这样的事情
factory.setAutoStartup(start);
但是,我不确定如何在 Kafka 反应器中实现(控制自动启动/停止行为)。我想要下面的东西
引入一个属性来处理自动启动/停止行为
@Value("${consumer.autostart:true}")
private boolean start;
使用上述属性,我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,就像这样
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);
注意:.setAutoStart(start);
这在 Kafka 反应器中是否可行,如果可以,我该怎么做?
更新:
protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
kafkaEventHubInboundReceiver
.receive()
.publishOn(scheduler)
.groupBy(receiverRecord -> {
try {
return receiverRecord.receiverOffset().topicPartition();
} catch (Throwable throwable) {
log.error("exception in groupby", throwable);
return Flux.empty();
}
}).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
.map(record -> {
processMessage(record, topicName, allowedValues).block(
Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
return record;
}).concatMap(message -> {
log.info("Received message after processing offset: {} partition: {} ",
message.offset(), message.partition());
return message.receiverOffset()
.commit()
.onErrorContinue((t, o) -> log.error(
String.format("exception raised while commit offset %s", o), t)
);
})).onErrorContinue((t, o) -> {
try {
if (null != o) {
ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
ReceiverOffset offset = record.receiverOffset();
log.debug("failed to process message: {} partition: {} and message: {} ",
offset.offset(), record.partition(), record.value());
}
log.error(String.format("exception raised while processing message %s", o), t);
} catch (Throwable inner) {
log.error("encountered error in onErrorContinue", inner);
}
}).subscribeOn(scheduler).subscribe();
我可以做这样的事情吗?
kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}
解决方案
With reactor-kafka
there is no concept of "auto start"; you are in complete control.
The consumer is not "started" until you subscribe to the Flux
returned from receiver.receive()
.
Simply delay the flux.subscribe()
until you are ready to consume data.
推荐阅读
- python - 函数内部的 int(intput()) 不起作用?
- ubuntu - Vscode如何用文件/目录/应用文件隐藏顶部栏?
- ruby-on-rails - Rails 验证码属性
- tensorflow - 有没有办法对 ImageDataGenerator 流进行后处理?
- android - 无法运行我的 android 仪器测试。清单中包含提供程序的应用
- r - 将闪亮的 R 应用程序部署到 shinyapps.io 会导致连接字符串出现 SQL 连接错误
- spring-mvc - 警告应用程序将关闭的示例通知
- php - 不允许序列化“闭包” Laravel 验证
- java - 如何在 logback.xml 中定义 Map?
- python - 检查目录中的文件夹数量(没有文件或子文件夹)