spring - 出现错误时如何重试kafka消息-spring cloud stream
问题描述
我对卡夫卡很陌生。我正在使用 Spring Cloud Stream Kafka 来生产和消费
@StreamListener(Sink.INPUT)
public void process(Order order) {
try {
// have my message processing
}
catch( exception e ) {
//retry here that record..
}
}
}
只想知道如何实现重试?对此的任何帮助都非常感谢
解决方案
海
有多种方法可以处理“重试”,这取决于您遇到的事件类型。
对于基本问题,kafka 框架将重试以使您从错误条件中恢复,例如在网络短时间停机的情况下,消费者和生产者 api 会实现自动重试。
特别是 kafka 支持“内置生产者/消费者重试”以正确处理各种错误而不会丢失消息,但作为开发人员,您仍然必须能够使用您提到的 try-catch 块处理其他类型的错误.
kafka中的错误可以分为以下几类:
- (生产者和消费者端)不可重试的代理错误,例如有关消息大小的错误、授权错误等 -> 您必须在应用程序的“设计阶段”处理它们。
- (生产者端)在消息发送到代理之前发生的错误——例如,序列化错误——>您必须在运行时应用程序执行中处理它们
- (生产者和消费者端错误发生在生产者用尽所有重试尝试或生产者使用的可用内存由于在重试时使用所有内存来存储消息而被填充到限制时->您应该处理这些错误。
关于“如何重试”的另一个注意点是在自动提交选项设置为 false 的情况下如何正确处理提交的顺序。
获得正确提交顺序的一种常见且简单的模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到提交函数中。
当您准备发送重试时,检查回调获得的提交序列号是否等于实例变量;如果是,则没有更新的提交,可以安全地重试。如果实例序列号更高,请不要重试,因为已经发送了更新的提交。
推荐阅读
- windows - 无法获取任务列表批处理文件以输出子字符串
- python - 在 Windows 上的 .cmd 文件中启动 Python VEnv
- python-3.x - 使用 value_counts() 创建数据帧
- java - 是对象[] $1d = { 新对象[0]; } 考虑一个二维数组?
- html - 如何在 HTML 和 CSS 中制作一个在保持居中的同时自动缩小的对话框?
- git - 将项目/模块直接添加到 .gitmodules
- flutter - Flutter 从另一个 Wideget 更新小部件中的列表视图
- html - 如何显示搜索结果?
- azure - 如何使用反向代理机制模拟 Azure Cosmos DB 和 Azure Redis 区域故障场景?
- php - 学生总分从高到低排序