project-reactor - Reactor Flux - 仅在完成时从 Publisher 发出
问题描述
我有一些 Reactor Kafka 代码,它通过 a 读取事件KafkaReceiver
并通过 1 或多个KafkaSenders
连接到单个Publisher
. 一切都很好,但我想做的只是Flux
在完成时从这个连接的发送者发出一个事件(即,它已完成写入任何给定事件的所有下游主题,因此它不会为每个元素发出任何内容它向下游写入 Kafka 直到完成)。通过这种方式,我可以sample()
并定期提交偏移量,因为我知道每当发生这种sample()
情况时,我都会为传入事件提交偏移量,我已经为每个要提交偏移量的事件处理了所有下游消息。看来我可以使用pauseUntilOther()
或者then()
不知何故,但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。
主要发布者代码:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
通过调用返回的串联 KafkaSenders processEvent()
:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
解决方案
听起来像是Flux.last()
您正在寻找的东西:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
.last();
然后你.sample(Duration.ofMillis(10))
会做任何事情作为最后一个项目从一个或几个批次发送给这些经纪人。最后,您commitReceiverOffset()
将正确地提交最后的任何内容。
有关更多信息,请参阅其 JavaDocs:
/**
* Emit the last element observed before complete signal as a {@link Mono}, or emit
* {@link NoSuchElementException} error if the source was empty.
* For a passive version use {@link #takeLast(int)}
*
* <p>
* <img class="marble" src="doc-files/marbles/last.svg" alt="">
*
* <p><strong>Discard Support:</strong> This operator discards elements before the last.
*
* @return a {@link Mono} with the last value in this {@link Flux}
*/
public final Mono<T> last() {
和大理石图:https ://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg
推荐阅读
- javascript - 如何使用 Javascript 将实例添加到实例化对象?
- javascript - jsPDF返回空白页
- forms - 控件在设计器中引发了未处理的异常并且已被禁用错误值不能为空
- javascript - 我无法映射数组“产品”(React/Redux)
- image - 使用 ImageMagick 生成多层 PSD
- java - 如何在 java.io.file 中创建文件?
- android - Flutter:如何暂停和重新启动“Stream”侦听器?
- android - 如何使用 GraalVM 从 JVM 工件交叉编译共享库?
- sas - SAS VA 使用逻辑回归创建预测模型
- neo4j - Cypher 选择任何邻居不包含属性的顶点