java - 如果 Kafka 服务器不可用,则终止 Spring Kafka 事务
问题描述
在我们的项目中,我们需要通过多个线程定期从 3rd 方获取数据,然后将这些数据推送到 Kafka。如果此时 Kafka 服务器不可用,则应终止流程,获取的数据应丢失并在下一次计划执行期间重新获取。此外,还需要使用事务管理,因为我们需要将这批消息发送到 Kafka 到不同的主题。如果没有发送一条消息,则应回滚所有其他消息。
如果 Kafka 服务器不可用,我们会遇到终止执行的问题。当事务管理被禁用时,一切正常,我们得到
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
但是启用事务管理后,Kafka 生产者会尝试无限地访问服务器,并且所有发起将消息推送到 Kafka 的线程都会坚持。我们尝试了不同的设置以使其在一段时间不可用后失败,但没有帮助。设置spring.kafka.producer.retries: 0
或spring.kafka.producer.acks: 0
原因(不是完整的堆栈跟踪):
Caused by: org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.
Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
当 Kafka 服务器不可用且启用事务管理时,是否存在某些设置组合使 Kafka Producer 在超时后失败?这可能吗?
解决方案
Spring-Kafka 用于DefaultAfterRollbackProcessor
寻找失败的偏移量并重试 - 这将继续循环,直到偏移量被正确处理。这是默认行为。如果事务失败,您将回滚,这取决于rollbackFor
.@Transactional
您有一个特殊情况,如果 Kafka 服务器不可用,它将回滚。您可以通过实现来创建自己的处理器AfterRollbackProcessor
。由于连接超时,您需要区分常规回滚和回滚。
编辑:
您还可以定义要排除noRollbackFor
的属性并让此异常到达. 您可以创建自定义 ExceptionHandler 并在容器上使用。您可以在此处阅读有关容器错误处理程序的信息@Transactional
TimeoutException
Container
setErrorHandler(..)
推荐阅读
- ios - 如果它们的总宽度大于 UICollectionView 宽度,则剪辑最后一个 UICollectionViewCell
- javascript - 向图表添加垂直线
- ruby-on-rails - Rails Active Record 关联查询
- vb.net - 附加到 DataGridView 的 DataTable,不填充现有列
- inheritance - NSwag 继承和多态性
- python - Pyspark UDF 不工作,期望字符串不是列
- php - PHP file_get_contents 在 CentOS 上不起作用
- json - pyspark json 读取以标记不良记录
- python - 如果循环 x 次,计算列表项通过次数的公式是什么
- sql - SQL 查询:计算每个用户每月总计数的百分比