首页 > 解决方案 > 反应性 akka 流:如何延迟图形关闭直到源干涸?

问题描述

我正在尝试将数据(比如 10 个字符串)从 kafka 经纪人提供给 akka-stream 图

这是 unittest 中的图表:

val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value, "porteur-id")

val drainingControl =
  Consumer
    .plainSource(consumer.kafkaConsumerSettings, Subscriptions.topics("transaction"))
    .toMat(Sink.seq)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete, Duration.Inf).size should be > 0

单元测试失败(大小等于 0,预期 10)

但是,如果我Thread.sleep(5000)在 drainAndShutdown() 调用之前插入一个,它就会通过。

它给我的印象是,在没有睡眠的情况下,图形在 run() 调用后直接关闭,甚至没有时间处理第一条消息。如果我引入一个 sleep 命令,它可以遍历所有源内容,它关闭图表,最后我得到了一些东西。

我看不出我做错了什么,因为它基本上是一个片段,你可以在任何地方找到,并用作示例

仅当源已传递所有消息时,如何触发 drainAndShutDown 调用?

谢谢你的帮助 !!

注意:我使用的是 akka-stream-kafka 0.22 版本

标签: scalaapache-kafkaakka-streamalpakka

解决方案


根据 Alpakka Kafka 的文档drainAndShutdown行为是:

停止从 Source 产生消息,等待流完成并关闭消费者 Source,以便所有消费的消息到达流的末尾。

这意味着您将停止在您的消费者中接收消息,它只会处理已经入队的消息。

对于测试流,我建议您看一下Streams TestKit,因为它可以让您对要测试的流的哪些部分进行大量控制


推荐阅读