首页 > 解决方案 > 如果 Kafka 服务器不可用,则终止 Spring Kafka 事务

问题描述

在我们的项目中,我们需要通过多个线程定期从 3rd 方获取数据,然后将这些数据推送到 Kafka。如果此时 Kafka 服务器不可用,则应终止流程,获取的数据应丢失并在下一次计划执行期间重新获取。此外,还需要使用事务管理,因为我们需要将这批消息发送到 Kafka 到不同的主题。如果没有发送一条消息,则应回滚所有其他消息。

如果 Kafka 服务器不可用,我们会遇到终止执行的问题。当事务管理被禁用时,一切正常,我们得到

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

但是启用事务管理后,Kafka 生产者会尝试无限地访问服务器,并且所有发起将消息推送到 Kafka 的线程都会坚持。我们尝试了不同的设置以使其在一段时间不可用后失败,但没有帮助。设置spring.kafka.producer.retries: 0spring.kafka.producer.acks: 0原因(不是完整的堆栈跟踪):

Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

当 Kafka 服务器不可用且启用事务管理时,是否存在某些设置组合使 Kafka Producer 在超时后失败?这可能吗?

标签: javaapache-kafkaspring-kafka

解决方案


Spring-Kafka 用于DefaultAfterRollbackProcessor寻找失败的偏移量并重试 - 这将继续循环,直到偏移量被正确处理。这是默认行为。如果事务失败,您将回滚,这取决于rollbackFor.@Transactional

您有一个特殊情况,如果 Kafka 服务器不可用,它将回滚。您可以通过实现来创建自己的处理器AfterRollbackProcessor。由于连接超时,您需要区分常规回滚和回滚。

编辑: 您还可以定义要排除noRollbackFor的属性并让此异常到达. 您可以创建自定义 ExceptionHandler 并在容器上使用。您可以在此处阅读有关容器错误处理程序的信息@TransactionalTimeoutExceptionContainersetErrorHandler(..)


推荐阅读