首页 > 解决方案 > 出现错误时如何重试kafka消息-spring cloud stream

问题描述

我对卡夫卡很陌生。我正在使用 Spring Cloud Stream Kafka 来生产和消费

@StreamListener(Sink.INPUT)
 public void process(Order order) {

       try {
      // have my message processing 

    }
    catch( exception e ) {
       //retry here that record..
     }
    }
 }

只想知道如何实现重试?对此的任何帮助都非常感谢

标签: springapache-kafkastream

解决方案


有多种方法可以处理“重试”,这取决于您遇到的事件类型。

对于基本问题,kafka 框架将重试以使您从错误条件中恢复,例如在网络短时间停机的情况下,消费者和生产者 api 会实现自动重试。

特别是 kafka 支持“内置生产者/消费者重试”以正确处理各种错误而不会丢失消息,但作为开发人员,您仍然必须能够使用您提到的 try-catch 块处理其他类型的错误.

kafka中的错误可以分为以下几类:

  • (生产者和消费者端)不可重试的代理错误,例如有关消息大小的错误、授权错误等 -> 您必须在应用程序的“设计阶段”处理它们。
  • (生产者端)在消息发送到代理之前发生的错误——例如,序列化错误——>您必须在运行时应用程序执行中处理它们
  • (生产者和消费者端错误发生在生产者用尽所有重试尝试或生产者使用的可用内存由于在重试时使用所有内存来存储消息而被填充到限制时->您应该处理这些错误。

关于“如何重试”的另一个注意点是在自动提交选项设置为 false 的情况下如何正确处理提交的顺序。

获得正确提交顺序的一种常见且简单的模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到提交函数中。

当您准备发送重试时,检查回调获得的提交序列号是否等于实例变量;如果是,则没有更新的提交,可以安全地重试。如果实例序列号更高,请不要重试,因为已经发送了更新的提交。


推荐阅读