apache-storm - 等待 submitToplogy 完成
问题描述
我正在阅读风暴应用书。我在书中找到了以下代码片段
LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()
所以这段代码将提交拓扑执行等待固定10分钟,然后杀死拓扑。但这很奇怪。我怎么能说。submitTopology 等待它完成并完成。杀死并关闭。
就像在 Akka Streams 中我们得到的一样Future[Done]
,我们只是等待那个未来完成。(而不是固定的 10 分钟)。
解决方案
您可以使用https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376来做到这一点。
在某些情况下不使用它的原因是它需要拓扑中的每个 spout 来实现 CompletableSpout 接口https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/ apache/storm/testing/CompletableSpout.java。
大多数 Storm spout 从未达到“完成”的程度(因为它是流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在消费来自 Kafka 主题的消息,生产者可能随时向该主题添加更多消息,那么消费者将如何确定它已完成消费?
CompletableSpout 的存在主要是为了简化测试,因为这样一个 spout 就可以判断它是否完成了。然后,我链接的 completeTopology 方法可以使用这个额外的功能来判断拓扑中的所有 spout 是否都“完成”,然后可以停止拓扑。
如果您在测试中使用的 spout 没有实现 CompletableSpout(大多数 spout 没有实现),则通常无法判断拓扑何时完成。在许多情况下,您仍然可以比您链接的示例做得更好,例如,如果我的拓扑应该在测试中将 10 条消息写入队列,我可以在将 10 条消息写入队列后结束测试。
与 Akka 流相关,我对它们不是很熟悉,但是查看介绍性文档,您可以认为 CompletableSpouts 类似于有界 Sources(例如 a Source(1 to 100)
),而“正常” spout 是无界 Sources (例如 a Source.repeat(1)
) .
推荐阅读
- pine-script - 想将三元运算符转换为循环
- java - Apache Camel ThreadPoolExecutor(InMemorySagaService) RequestContext 问题
- php - HTTP 到 HTTPS 新 SSL
- rest-assured-jsonpath - 当我尝试匹配 json 数组中的双精度值时,它会失败,预期值周围有尖括号。这是什么意思?
- python - 如何在 Pandas 中使用缺失值的聚合函数
- python - 将空行转换为熊猫中的列
- swift - 如何在 UIActionSheet 中添加多行文本
- python - Python检查字符串是否有字符
- javascript - 如何在 mousemove 事件中确定鼠标的方向(左上、右上、左下和右下)
- html - 仅在移动设备上居中对齐文本