首页 > 解决方案 > 配置 Akka Alpakka kafka 以调查挂起的消费者

问题描述

我想正确有效地记录 Kafka 流以调查我的用例,如下所示。

我试图解决的问题在 github https://github.com/akka/alpakka-kafka/issues/899上公开为一个错误,但可能是我做错了什么,我正在尝试调查.

简而言之,每当我从属于同一个消费者组的多个消费者的一组主题中消费时(该组的所有消费者都订阅同一组主题),只有一半的消费者真正执行消费,另一个只是挂起.

日志的详细信息在票证中。

所以我想了解为什么那些消费者没有得到任何数据。确切的问题是什么,为什么他们继续记录:FETCH_SESSION_ID_NOT_FOUND

我想将我的日志记录集中在足够的细节上以理解这一点,而不是过多的细节导致难以发现问题的根源。

奇怪的是,此时调整 Akka 上的调试不起作用。只有当我在 Apache 上打开它时,我才会得到调试结果,但是它们太多了。

这是我的日志记录配置方案:

在我的 application.conf 我有

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = ${?AKKA_LOG_LEVEL}
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}

在我的 logBack.xml 我有

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="org.apache" level="${APACHE_LOG_LEVEL:-INFO}"/>
    <logger name="com.elsevier.entellect" level="${APP_LOG_LEVEL:-INFO}"/>
    <logger name="akka" level="${AKKA_LOG_LEVEL:-INFO}"/>

    <root level="${ROOT_LOG_LEVEL:-INFO}">
        <appender-ref ref="STDOUT" />
    </root>

</configuration>

我通过 Kubernetes 部署并根据需要注入环境变量。

将 AKKA_LOG_LEVEL 设置为 DEBUG,完全没有区别。

但是,将 APACHE_LOG_LEVEL 设置为 OFF、INFO 或 DEBUG 会有所不同。然而,在 INFO,我有关于 KAFKA 的基本信息,如 Github 上的帖子。如果我放 DEBUG 那么我得到的东西太多了。

更具体地说,帮助找出我需要设置哪个级别的记录器以至少捕获挂起的消费者正在发生的事情?他们是否提出请求但没有得到任何东西,是否存在重新平衡问题?

注意我的消费者的配置:

val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(conf.kafkaBroker.bootstrapServers)
      .withGroupId(conf.kafkaConsumer.groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, conf.kafkaConsumer.offsetReset)
      .withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1800000")
      .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000")
      .withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "60000")
      .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000000")
      .withProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000")

此设置特定于我们对那些消费者的工作量,他们必须执行很长时间的操作。

标签: scalaapache-kafkaakka-streamalpakka

解决方案


推荐阅读