scala - 以编程方式停止 Alpakka Kafka 流的正确方法
问题描述
我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来消费服务中的事件流。为了处理事件处理错误,我们使用 Kafka 自动提交和多个队列。例如,如果我们有user_created
想要从产品服务中消费的主题,我们也会创建user_created_for_products_failed
和user_created_for_products_dead_letter
。这两个额外的主题与特定的 Kafka 消费者组耦合。如果一个事件处理失败,它会进入失败队列,我们会在五分钟内再次尝试消费——如果再次失败,它就会进入死信。
在部署时,我们希望确保我们不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些“飞行”的事件都尚未处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
阅读文档后,我们已经看到了该KillSwitch
功能。我们在其中看到的问题是该shutdown
方法按我们的预期Unit
返回。Future[Unit]
我们不确定使用它是否会丢失事件,因为在测试中它看起来太快而无法正常工作。
ActorSystem
作为一种解决方法,我们为每个流创建一个并使用该terminate
方法(返回 a Future[Terminate]
)。这个解决方案的问题是,我们认为创建ActorSystem
每个流不会很好地扩展,并且terminate
需要很长时间才能解决(在测试中,关闭最多需要一分钟)。
你遇到过这样的问题吗?是否有更快的方法(与 相比ActorSystem.terminate
)来停止流并确保Source
已处理所有已发出的事件?
解决方案
从文档(强调我的):
使用外部偏移存储时,调用
Consumer.Control.shutdown()
就足以完成Source
,这将启动流的完成。
val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()
Consumer.control.shutdown()
返回一个Future[Done]
。从它的 Scaladoc 描述中:
关闭消费者
Source
。它将在关闭之前等待未完成的偏移提交请求完成。
或者,如果您在 Kafka 中使用偏移存储,请使用Consumer.Control.drainAndShutdown
,它也返回一个Future
. 再次来自文档(其中包含有关幕后drainAndShutdown
工作的更多信息):
val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
Scaladoc 描述drainAndShutdown
:
停止从 生成消息
Source
,等待流完成并关闭消费者Source
,以便所有消费的消息都到达流的末尾。流完成中的失败将被传播,无论如何源将被关闭。
推荐阅读
- javascript - 对象不支持“延迟”属性或方法
- .net - Winforms:防止自动绘制子控件
- reactjs - 错误:当目标不是服务器时无法导出。在部署期间使用下一个 js 和 zeit-NOW
- vb.net - 在 Windows 10 下运行的应用程序在字符串中找到 byteOrderMarkUtf8,但字符串中没有
- javascript - 如何使用异步剪贴板 API 从 chrome 扩展复制文本
- powershell - 带有约束的 Powershell 随机获取
- ruby-on-rails - pg gem 不会为 Heroku 部署安装
- r - R:从 Google 搜索结果中获取超出首页的链接
- assembly - 如何使用库 emu8086.inc 打印除法的其余部分
- node.js - 如何访问 Wordpress 身份验证令牌