apache-kafka - Storm KafkaSpout如何知道所有的bolts都被执行了
问题描述
例如我的拓扑代码是这样的:
builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");
当bolt1 发出时,消息将被自动确认。但是当bolt2或bolt3出现异常时,无法重发这条消息,如何找回失败的消息?
解决方案
Storm 有tuple trees
掌舵它的概念。让我尝试使用问题中提供的示例进行解释。
当您spout
调用该collector.emit
方法时,新发出的元组,我们称之为它tuple1
,被添加到tuple tree
. 这个元组bolt1
在订阅它时到达,并将接收从spout
. 一旦它tuple1
在方法中接收到输入execute
,在处理输入之后,就会发出一个新值,tuple2
该值将添加到 之后的元组树中tuple1
。在退出该execute
方法之前,通过隐式调用来确认元组,collector.ack
这告诉风暴tuple1
已经处理,请将其从元组树中删除,现在保留tuple2
它传递给bolt2
处理。
现在问题出现了,如果bolt1
由于某种原因无法确认会发生什么。Storm 会看到经过一段时间,也就是拓扑超时时间(默认为 30 秒)后,元组树还没有耗尽,因此它会从头开始重播元组,并遵循上述相同的过程。
推荐阅读
- gradle - gradle项目的依赖地狱
- python-3.x - 行字段中的零天数
- c++ - 定义 UnaryPredicate 时 void 类型的非法操作数
- python-3.x - 我的 keras 神经网络模型给了我 0.0000e+00 的准确率
- mongodb - 数据库查询所需的概念性帮助
- spring-boot - ElasticsearchRepository 是否有获取 id 最大值的方法?如果没有,如何编写查询来获取 id 的最大值
- google-cloud-platform - 使用 Google Cloud Storage 从自托管网站在 WhatsApp 中共享的视频链接的缩略图
- powerbi - Azure 文本分析 API 到 Power BI - 错误:Web.Contents 无法获取内容
- node.js - 如何使用过滤器 _id 找到一条记录并使用 nodejs 和 mongodb、mongoose 更新该记录
- c# - C#在同一台机器上的多个程序中接收多播UDP?