首页 > 解决方案 > 如何让 Play 应用程序中的 Scala Kafka Consumer 在应用程序的整个生命周期中持续监听代理?

问题描述

我正在尝试创建一个 Play-Scala 应用程序,该应用程序使用 Scala Kafka Consumer 来监听 Kafka 代理。我正在使用 Cake Solutions Scala Kafka 客户端库,并在此处遵循他们的示例。

我创建了一个包含类来充当 Kafka 消费者提供者,并将它绑定为一个渴望的单例,以便在应用程序启动时创建它。

问题是消费者会在应用程序启动时监听代理,但之后不会。

这是我的 ConsumerProvider 代码:

trait KafkaConsumerProvider {

  def consumer: ActorRef

}

@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
    extends KafkaConsumerProvider {

  private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
    keyDeserializer = new StringDeserializer,
    valueDeserializer = new StringDeserializer,
    bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
    groupId = configuration.get[String]("messageBroker.consumer.groupId"),
    enableAutoCommit = false,
    autoCommitInterval= 1000,
    sessionTimeoutMs = 10000,
    maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
    maxPollRecords = 500,
    maxPollInterval = 300000,
    maxMetaDataAge  = 300000,
    autoOffsetReset = OffsetResetStrategy.LATEST,
    isolationLevel = IsolationLevel.READ_UNCOMMITTED,
  )

  private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
    scheduleInterval = 1.seconds,   // scheduling interval for Kafka polling when consumer is inactive
    unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
    maxRedeliveries = 3             // maximum number of times a unconfirmed message will be redelivered
  )

  override val consumer: ActorRef = {
    val receiverActor = actorSystem.actorOf(ReceiverActor.props)
    val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
    val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
    _consumer ! Subscribe.AutoPartition(topics)
    _consumer
  }

}

这就是我如何将依赖项绑定为一个渴望的单例Module.scala

class Module extends AbstractModule with ScalaModule {

  override def configure(): Unit = {
    bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
    bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
  }

}

如何让消费者继续倾听?

标签: scalaapache-kafkaplayframeworkkafka-consumer-api

解决方案


问题是,在 中ReceiverActor,我忘记确认偏移量:

sender() ! Confirm(records.offsets)

推荐阅读