apache-kafka - Reactor - Kafka - 消费者在处理消息时因错误而停止
问题描述
我正在使用 Reactor-Kafka 1.2.4,目前面临的问题是在处理消息时遇到错误,Kafka 消费者停止并且不会继续处理其他消息。
我目前只想记录错误消息并继续处理下一条消息,稍后将添加 DLQ 处理等。
我已经将 Reactor-Kafka 升级到 1.3.0,另外,下面的代码是否足够好?
KafkaReceiver
.receive()
.retry(3)
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.immediate())
.subscribe(this::processRecord);
private void processRecord(ReceiverRecord<String, M> record) {
try {
<Process record>
} catch (Exception e) {
log.error("Error thrown {} while consuming message", e.getMessage());
<Later on Publish to Dead letter Queue>
} finally {
record.receiverOffset().acknowledge();
log.debug("Successfully acknowledged kafka message");
}
}
谢谢你。
解决方案
推荐阅读
- flutter - 使用contacts_service进行新更新后应用程序崩溃
- django - “str”对象没有“已验证”属性
- javascript - 如何更改二叉树索引中递归的保留?
- fastlane - 快车道:找不到动作、车道或变量“”
- c# - 实体框架:从迁移更改中排除审计属性
- css - 在 div 上设置菜单(z-index)
- javascript - 如果 innerHTML 有点长,来自 reactstrap 的不受控制的工具提示会崩溃
- laravel - Laravel: Argument 2 passed to Illuminate\Auth\SessionGuard::__construct()
- jquery - 一次打开一个手风琴
- go - 登录到 stderr 和 stdout golang Google Cloud Platform