java - 无限通量和批量写入数据库
问题描述
在继续实际事件处理之前,我有一个无限的 Flux(来自使用 reactor-kafka 的 kafka)的事件,我试图将这些事件批量写入数据库。我的问题是让它在适当的背压下工作。
windowTimeout
并且bufferTimeout
似乎是不错的候选人,因为他们允许我指定最大大小,但也限制了在“流量”低的情况下等待的时间。
首先是windowTimeout
,从那里对数据库进行批量写入。然而,这很快就出现了问题:reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...))。
然后我切换到bufferTimeout
,但没有成功,出现错误reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests。
我希望以下说明我所追求的流程:
flux.groupBy(envelope -> envelope.partition)
.flatMap(partitionFlux -> {
final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
.concatMap(batch -> {
final ConsumedEnvelope last = batch.get(batch.size() - 1);
return repository.persist(batch) // a)
.then(last.acknowledge()) // b)
.thenReturn(batch);
});
return processing(batchFlux);
})
.subscribe(result -> {
// ...
});
a)repository.persist
在内部什么都不做,只是迭代批处理以创建插入操作,然后返回 a Mono<Void>
。
b) ConsumedEnvelope.acknowledge() 用于 Kafka 偏移,我只想在成功持久化批处理后才执行此操作。所有这些都包含在concatMap
每个分区一次只处理一个批次中。
如上所述,这会导致溢出异常。是否有任何惯用的方式来实现我试图描述的内容?在我看来,这不应该是一个非常罕见的任务,但我是反应堆的新手,很想得到一些建议。
/d
编辑我意识到简单地添加onBackpressureBuffer
实际上可以为我解决这个问题。但总的来说,有没有更好的方法来做到这一点?
编辑 2 ...由于不受约束的需求,上述内容确实引起了问题,我不知何故错过了。所以,回到最初的问题,或者也许是某种方式让 onBackpressureBuffer 不请求未绑定的需求,而只转发下游请求的内容。
解决方案
推荐阅读
- html - 如何完美地将长文本放入 CSS 中的圆圈内
- javascript - 是否可以单击具有动态内容的页面上的每个元素?
- laravel-5 - 传递给 Illuminate\Database\Query\Builder::insert() 的参数 1 必须是数组类型,给定 null
- selenium - 当可以使用 Selenium 和 Java 找到任何一个元素时如何选择一个元素
- php - LiveChatMessages API YouTube 返回 null
- android - 在 Android 上以编程方式获取 EMDK 版本
- javascript - 在工具提示中将秒转换为时间
- python - 有没有更简洁的方法来读取多个 *.csv 文件并添加一列然后添加一个 for 循环?
- ajax - Laravel ajax URL 在本地完美地返回 404 错误
- spring-security - @PreAuthorize hasRole 使用属性注入的值