scala - 反应性 akka 流:如何延迟图形关闭直到源干涸?
问题描述
我正在尝试将数据(比如 10 个字符串)从 kafka 经纪人提供给 akka-stream 图
这是 unittest 中的图表:
val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value, "porteur-id")
val drainingControl =
Consumer
.plainSource(consumer.kafkaConsumerSettings, Subscriptions.topics("transaction"))
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete, Duration.Inf).size should be > 0
单元测试失败(大小等于 0,预期 10)
但是,如果我Thread.sleep(5000)
在 drainAndShutdown() 调用之前插入一个,它就会通过。
它给我的印象是,在没有睡眠的情况下,图形在 run() 调用后直接关闭,甚至没有时间处理第一条消息。如果我引入一个 sleep 命令,它可以遍历所有源内容,它关闭图表,最后我得到了一些东西。
我看不出我做错了什么,因为它基本上是一个片段,你可以在任何地方找到,并用作示例
仅当源已传递所有消息时,如何触发 drainAndShutDown 调用?
谢谢你的帮助 !!
注意:我使用的是 akka-stream-kafka 0.22 版本
解决方案
根据 Alpakka Kafka 的文档,drainAndShutdown
行为是:
停止从 Source 产生消息,等待流完成并关闭消费者 Source,以便所有消费的消息到达流的末尾。
这意味着您将停止在您的消费者中接收消息,它只会处理已经入队的消息。
对于测试流,我建议您看一下Streams TestKit,因为它可以让您对要测试的流的哪些部分进行大量控制
推荐阅读
- autodesk-forge - 如何将输入(文本框、滑块等)添加到 Autodesk Forge 查看器
- windows - 使用 CURL Windows 发布 json
- regex - 文件中的 SED 替换
- c# - 如何在 c# 中使用 selenium sendkey 传递 url?
- python - 密集层的输入 0 与该层不兼容:输入形状的预期轴 -1 具有值 8192,但接收到的输入具有形状(无,61608)
- r - 列中的计算取决于 R 中先前的观察结果
- android - AppCompatSpinner 仅在少数设备上抛出 InflateException
- graphql - 根据其他未请求的字段返回自定义字段?
- python - 返回语句不打印类中的输出(特定)
- javascript - chrome 扩展 - 创建新的弹出窗口会导致一个空白窗口,直到我移动它