首页 > 解决方案 > Reactor Flux - 仅在完成时从 Publisher 发出

问题描述

我有一些 Reactor Kafka 代码,它通过 a 读取事件KafkaReceiver并通过 1 或多个KafkaSenders连接到单个Publisher. 一切都很好,但我想做的只是Flux在完成时从这个连接的发送者发出一个事件(即,它已完成写入任何给定事件的所有下游主题,因此它不会为每个元素发出任何内容它向下游写入 Kafka 直到完成)。通过这种方式,我可以sample()并定期提交偏移量,因为我知道每当发生这种sample()情况时,我都会为传入事件提交偏移量,我已经为每个要提交偏移量的事件处理了所有下游消息。看来我可以使用pauseUntilOther()或者then()不知何故,但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。

主要发布者代码:

this.kafkaReceiver.receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
                partitionFlux.publishOn(this.scheduler)
                        .flatMap(this::processEvent)
                        .sample(Duration.ofMillis(10))
                        .concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
        .subscribe();

通过调用返回的串联 KafkaSenders processEvent()

return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);

标签: project-reactorreactorreactor-kafka

解决方案


听起来像是Flux.last()您正在寻找的东西:

return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
    .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
    .last();

然后你.sample(Duration.ofMillis(10))会做任何事情作为最后一个项目从一个或几个批次发送给这些经纪人。最后,您commitReceiverOffset()将正确地提交最后的任何内容。

有关更多信息,请参阅其 JavaDocs:

/**
 * Emit the last element observed before complete signal as a {@link Mono}, or emit
 * {@link NoSuchElementException} error if the source was empty.
 * For a passive version use {@link #takeLast(int)}
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/last.svg" alt="">
 *
 * <p><strong>Discard Support:</strong> This operator discards elements before the last.
 *
 * @return a {@link Mono} with the last value in this {@link Flux}
 */
public final Mono<T> last() {

和大理石图:https ://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg


推荐阅读