project-reactor - 在reactor-kafka中立即生效?
问题描述
尝试实现一个重试记录处理器,该处理器在发生瞬时故障时回溯到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续足够长的时间,则冒着消费者会话超时和重新平衡的风险。
kafkaTemplate
.receive()
...
.concatMap(record -> {
var postResponses = mapper.transform(record.value())
.stream()
.map(this::post)
.toArray(Mono[]::new);
return Mono.when(postResponses)
.doOnSuccess(__ -> record.receiverOffset().acknowledge())
.onErrorResume(ex ->
kafkaTemplate.seek(record.receiverOffset().topicPartition(), record.offset())
.then(Mono.delay(someRetryinterval).then())
);
}, 0) // no prefetch!
.subscribe();
将最大轮询记录设置为 1,在此代码开始之前,一个主题中有两条记录,Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到它成功。
我错过了什么吗?如何在不切换到阻塞 API 的情况下完全序列化 poll 并寻求调用以处理记录,并根据需要进行尽可能多的重试?
期望具有 SeekToCurrentErrorHandler 的 @KafkaListener 的等效项也可以在 RK 中工作,但发现并非如此
解决方案
推荐阅读
- powerbi - 我的措施在日期过滤器的情况下不起作用
- react-native - 我应该如何在这个 fetch 方法中使用 asyncstorage.setItem?
- javascript - 无法在字符串文字运算符中获取索引的值,除了我尝试过的方式之外,还有什么方法可以获取该值?
- sql - 如何将三个表与其他两个返回的多个值行连接起来
- css - 响应式网站不适用于 Css 中的媒体查询
- react-native - “动作可能没有未定义的“类型”
- ruby-on-rails - Rails 在父子关系中访问子兄弟姐妹的最佳方式
- python - 如何在python中使用多个装饰器?
- flutter - 如何使用颤振从应用程序中导出文件中的csv?不使用简单的权限插件和permission_handler 插件
- excel - 查找具有相反重复项的单元格