scala - 如何让 Play 应用程序中的 Scala Kafka Consumer 在应用程序的整个生命周期中持续监听代理?
问题描述
我正在尝试创建一个 Play-Scala 应用程序,该应用程序使用 Scala Kafka Consumer 来监听 Kafka 代理。我正在使用 Cake Solutions Scala Kafka 客户端库,并在此处遵循他们的示例。
我创建了一个包含类来充当 Kafka 消费者提供者,并将它绑定为一个渴望的单例,以便在应用程序启动时创建它。
问题是消费者会在应用程序启动时监听代理,但之后不会。
这是我的 ConsumerProvider 代码:
trait KafkaConsumerProvider {
def consumer: ActorRef
}
@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
extends KafkaConsumerProvider {
private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
groupId = configuration.get[String]("messageBroker.consumer.groupId"),
enableAutoCommit = false,
autoCommitInterval= 1000,
sessionTimeoutMs = 10000,
maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
maxPollRecords = 500,
maxPollInterval = 300000,
maxMetaDataAge = 300000,
autoOffsetReset = OffsetResetStrategy.LATEST,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
)
private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
scheduleInterval = 1.seconds, // scheduling interval for Kafka polling when consumer is inactive
unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
maxRedeliveries = 3 // maximum number of times a unconfirmed message will be redelivered
)
override val consumer: ActorRef = {
val receiverActor = actorSystem.actorOf(ReceiverActor.props)
val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
_consumer ! Subscribe.AutoPartition(topics)
_consumer
}
}
这就是我如何将依赖项绑定为一个渴望的单例Module.scala
:
class Module extends AbstractModule with ScalaModule {
override def configure(): Unit = {
bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
}
}
如何让消费者继续倾听?
解决方案
问题是,在 中ReceiverActor
,我忘记确认偏移量:
sender() ! Confirm(records.offsets)
推荐阅读
- wpf - 如何在我的 DataGrid 上创建第二行带有 TextBoxes 的标题?
- gmail - 如何使用 webhook 向谷歌聊天室发送电子邮件
- azure - 自动化 Azure Cloud Shell 配置取消配置存储帐户
- python - 如何使用 OR 运算符解压缩 Elasticserach 的列表?
- python - Python:将lambda应用于一行中列表的每个元素
- python - 从 API 响应列表中获取数据
- java - Java Matcher 类实现 toFindResult()
- ios - 错误构建 Xcode - 真实设备上的颤振项目
- sql - 计算最大日期和选定日期之间的差异
- node.js - docker-compose - NODE_ENV 不适用