首页 > 解决方案 > Storm KafkaSpout如何知道所有的bolts都被执行了

问题描述

例如我的拓扑代码是这样的:

builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");

当bolt1 发出时,消息将被自动确认。但是当bolt2或bolt3出现异常时,无法重发这条消息,如何找回失败的消息?

标签: apache-kafkaapache-stormkafka-consumer-api

解决方案


Storm 有tuple trees掌舵它的概念。让我尝试使用问题中提供的示例进行解释。

当您spout调用该collector.emit方法时,新发出的元组,我们称之为它tuple1,被添加到tuple tree. 这个元组bolt1在订阅它时到达,并将接收从spout. 一旦它tuple1在方法中接收到输入execute,在处理输入之后,就会发出一个新值,tuple2该值将添加到 之后的元组树中tuple1。在退出该execute方法之前,通过隐式调用来确认元组,collector.ack这告诉风暴tuple1已经处理,请将其从元组树中删除,现在保留tuple2它传递给bolt2处理。

现在问题出现了,如果bolt1由于某种原因无法确认会发生什么。Storm 会看到经过一段时间,也就是拓扑超时时间(默认为 30 秒)后,元组树还没有耗尽,因此它会从头开始重播元组,并遵循上述相同的过程。

希望我能够解释失败时会发生什么。有关更多详细信息,请阅读内容或观看此内容


推荐阅读