首页 > 解决方案 > 以编程方式停止 Alpakka Kafka 流的正确方法

问题描述

我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来消费服务中的事件流。为了处理事件处理错误,我们使用 Kafka 自动提交和多个队列。例如,如果我们有user_created想要从产品服务中消费的主题,我们也会创建user_created_for_products_faileduser_created_for_products_dead_letter。这两个额外的主题与特定的 Kafka 消费者组耦合。如果一个事件处理失败,它会进入失败队列,我们​​会在五分钟内再次尝试消费——如果再次失败,它就会进入死信。

在部署时,我们希望确保我们不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些“飞行”的事件都尚未处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。

阅读文档后,我们已经看到了该KillSwitch功能。我们在其中看到的问题是该shutdown方法按我们的预期Unit返回。Future[Unit]我们不确定使用它是否会丢失事件,因为在测试中它看起来太快而无法正常工作。

ActorSystem作为一种解决方法,我们为每个流创建一个并使用该terminate方法(返回 a Future[Terminate])。这个解决方案的问题是,我们认为创建ActorSystem每个流不会很好地扩展,并且terminate需要很长时间才能解决(在测试中,关闭最多需要一分钟)。

你遇到过这样的问题吗?是否有更快的方法(与 相比ActorSystem.terminate)来停止流并确保Source已处理所有已发出的事件?

标签: scalaapache-kafkaakkaakka-streamalpakka

解决方案


文档(强调我的):

使用外部偏移存储时,调用Consumer.Control.shutdown()就足以完成Source,这将启动流的完成。

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()

Consumer.control.shutdown()返回一个Future[Done]。从它的 Scaladoc 描述中:

关闭消费者Source。它将在关闭之前等待未完成的偏移提交请求完成。

或者,如果您在 Kafka 中使用偏移存储,请使用Consumer.Control.drainAndShutdown,它也返回一个Future. 再次来自文档(其中包含有关幕后drainAndShutdown工作的更多信息):

val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()

Scaladoc 描述drainAndShutdown

停止从 生成消息Source,等待流完成并关闭消费者Source,以便所有消费的消息都到达流的末尾。流完成中的失败将被传播,无论如何源将被关闭。


推荐阅读