首页 > 解决方案 > Kafka 指标似乎没有正确更新

问题描述

问题

我正在使用 Spring 的 Actuator Prometheus 端点输出的指标监控 Kafka 偏移滞后。在某些情况下,我注意到在执行大型回填作业时,我的一些节点(应用程序的 6 个实例)将停止更新指标,直到所有消息都被消耗完。有问题的节点仍在正确处理消息(如日志记录和最终结果所证明的那样)。在绘制我的指标时,这在偏移滞后中显示为一条平线,直到最后,当它们从悬崖边缘掉下时。

还值得注意的是,在指标按预期更新并且我们已经执行应用程序部署的情况下,我们可能会看到指标从按预期运行到一个或多个节点平坦,如上所述.

对我来说,这感觉像是某种不确定的竞争条件,可能通过我配置应用程序的方式来解释?

1

语境

我有一个使用多个主题的应用程序。除了一个主题之外,所有主题都具有共享消息格式,并且只有一个主题具有不同的格式,因为它是“重试”主题,因此消息是不同的。我还想将过滤器仅应用于常规主题,因此为了方便起见,我创建了两个单独的ConcurrentKafkaListenerContainerFactorybean,例如:

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageA> messageAContainerFactory(
        ConsumerFactory<Long, MessageA> consumerFactory) { ... }

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageB> messageBContainerFactory(
        ConsumerFactory<Long, MessageB> consumerFactory) { ... }

然后我在相关@KafkaListener注释中引用这些 bean:

@KafkaListener(
        topics = "message-a-topic",
        containerFactory = "messageAContainerFactory")
public void consumeMessageA(MessageA messageA) { ... }

@KafkaListener(
        topics = "message-B-topic",
        containerFactory = "messageBContainerFactory")
public void consumeMessageB(MessageB messageB) { ... }

除了这些豆子,没有什么其他的。Kafka 的应用程序配置如下所示:

spring:
  kafka:
    consumer:
      auto-offset-reset: latest
      max-poll-records: 10
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
          json:
            value:
              default:
                type: com.example.MessageA

我应该强调,我没有注意到该功能有任何问题 - 消息似乎按预期被使用和处理。这似乎只会影响指标的更新方式。

任何意见,将不胜感激!

标签: spring-bootspring-kafkaspring-micrometer

解决方案


推荐阅读