apache-kafka - 如何使用 Spring Reactive Kafka 实现重试和恢复逻辑
问题描述
我们正在使用https://github.com/reactor/reactor-kafka项目来实现 Spring Reactive Kafka。但是我们想利用 Kafka 重试和恢复反应式 Kafka 的逻辑。谁能提供一些示例代码?
解决方案
由于您使用 spring 生态系统进行重试和恢复,您可以使用 spring-retry 查看文档spring -retry。网上有足够的参考资料。
下面的示例示例正在使用来自 kafka 主题的消息并进行处理。
消费的方法标记为Retryable,所以如果有异常处理会重试,如果重试不成功则调用相应的恢复方法。
public class KafkaListener{
@KafkaListener(topic="books-topic", id ="group-1")
@Retryable(maxAttempts = 3, value = Exception.class))
public void consuming(String message){
// To do message processing
// Whenever there is exception thrown from this method
// - it will retry 3 times in total
// - Even after retry we get exception then it will be handed of to below
// recover method recoverConsuming
}
@Recover
public void recoverConsuming(Exception exception, String message){
// Recovery logic
// you can implement your recovery scenario
}
}
推荐阅读
- ios - iOS 的 MotionLayout
- multithreading - 队列的实现不是线程安全的
- c - 如何强制 gnu ld 使用 -lc 中的符号覆盖我自己的 libc 副本中的符号?
- java - 来自 TextField 的 SimpleIntegerProperty
- javascript - 为脚本添加仅在 Travis CI 中运行而不是在本地运行的权限
- matlab - 无法实例化派生的 MATLAB 类而不会出错
- node.js - Firebase Web Init 错误文件已存在
- javascript - JS - D3 多线时间序列图
- git - 每次提交都会在 git 中创建一个新的树对象吗?
- keras - 在 Keras 中使用 ModelCheckpoint 时的最大递归深度错误