首页 > 解决方案 > 等待 submitToplogy 完成

问题描述

我正在阅读风暴应用书。我在书中找到了以下代码片段

LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()

所以这段代码将提交拓扑执行等待固定10分钟,然后杀死拓扑。但这很奇怪。我怎么能说。submitTopology 等待它完成并完成。杀死并关闭。

就像在 Akka Streams 中我们得到的一样Future[Done],我们只是等待那个未来完成。(而不是固定的 10 分钟)。

标签: apache-storm

解决方案


您可以使用https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376来做到这一点。

在某些情况下不使用它的原因是它需要拓扑中的每个 spout 来实现 CompletableSpout 接口https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/ apache/storm/testing/CompletableSpout.java

大多数 Storm spout 从未达到“完成”的程度(因为它是流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在消费来自 Kafka 主题的消息,生产者可能随时向该主题添加更多消息,那么消费者将如何确定它已完成消费?

CompletableSpout 的存在主要是为了简化测试,因为这样一个 spout 就可以判断它是否完成了。然后,我链接的 completeTopology 方法可以使用这个额外的功能来判断拓扑中的所有 spout 是否都“完成”,然后可以停止拓扑。

如果您在测试中使用的 spout 没有实现 CompletableSpout(大多数 spout 没有实现),则通常无法判断拓扑何时完成。在许多情况下,您仍然可以比您链接的示例做得更好,例如,如果我的拓扑应该在测试中将 10 条消息写入队列,我可以在将 10 条消息写入队列后结束测试。

与 Akka 流相关,我对它们不是很熟悉,但是查看介绍性文档,您可以认为 CompletableSpouts 类似于有界 Sources(例如 a Source(1 to 100)),而“正常” spout 是无界 Sources (例如 a Source.repeat(1)) .


推荐阅读