apache-storm - 多微批风暴拓扑
问题描述
首先,如果我的问题重复,我深表歉意,我尝试搜索但找不到与我的问题相关的答案
首先真诚的道歉,如果我问一些非常基本的问题,因为我是 Storm 的初学者。而且如果我的问题是重复的,当我尝试搜索但找不到我的问题的相关答案时
请就我的以下用例提供建议。
我的用例:
我有一个 Spout 从一个内部消息传递机制读取数据,因为它以非常高的频率(100 秒/秒)接收和发送元组。
现在除了数据之外,每个元组还有一个频率(int)(因为总共可以有 4-5 种频率)。
现在我需要设计一个 Bolt 来批处理/汇集所有元组,并且只按频率定期发出,具有只发出最新元组的功能(如果在下一批之前收到重复),因为我们在元组数据中有一个基于字符串的键识别重复项。
例如
因此,所有以 25 秒为频率的元组将被汇集在一起,并由 Bolt 每 25 秒发出一次(如果在 25 秒内收到重复的元组,则只会考虑最新的一个)。
与所有 10 分钟的元组类似,因为频率将汇集在一起,并由 Bolt 每隔 10 分钟发出一次(如果在 10 分钟内收到重复的元组,则只会考虑最新的一个)。
** 现在,由于我们可以有 4-5 种类型的频率(例如 10 秒、25 秒、10 分钟、20 分钟等,这些都是配置的),并且每个元组都应该被组合成适当的批次并发出(如上面的例子)。
供参考。对于 Bolt 分组,我使用了“fieldsGrouping”,如下配置。
*.fieldsGrouping("FILTERING_BOLT",new Fields(PUBLISHING_FREQUENCY));*
请提供帮助或建议,对于我的用例来说,最好的方法是什么,因为只是想不出实现任何东西来处理并发元组的流动和管理 Storm 的内部并行性。
解决方案
听起来你想要窗口螺栓https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html。可能你想要一个翻滚窗口(即窗口间隔之间没有重叠)
Windowing bolts 让你设置它们应该发射窗口的间隔(例如每10 秒),然后在调用你提供的execute 方法之前,bolt 将缓冲它在前10 秒接收到的所有元组。
我认为您想要的结构类似于例如
spout -> splitter -> 5 second window bolt
-> 10 second window bolt
拆分器应该接收元组,检查频率场并将元组发送到右侧窗口螺栓。您可以通过为每种频率类型声明一个流来做到这一点。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare("5-sec-stream", ...);
declarer.declare("10-sec-stream", ...);
}
public void execute(Tuple input) {
if (frequencyIsFive(input)) {
collector.emit("5-sec-stream", new Values(input.getValues()))
}
//more cases here
}
然后当你声明你的拓扑时
topologyBuilder.setBolt("splitter", new SplitterBolt())
.shuffleGrouping("spout")
topologyBuilder.setBolt("5-second-window", new YourWindowingBolt())
.globalGrouping("splitter", "5-sec-stream")
使所有 5 秒元组转到 5 秒窗口螺栓。
有关这方面的更多信息,特别是有关流和分组的部分,请参阅https://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html 。
您可能需要注意的一件事是 Storm 的元组超时。如果您需要一个例如 10 分钟的窗口,您需要将元组超时从默认的 30 秒大幅提高,这样元组在队列中等待时不会超时。您可以通过设置来做到这一点,例如conf.setMessageTimeoutSecs(15*60)
在配置拓扑时。您希望在窗口间隔和元组超时之间有一点余地,因为您希望尽可能避免元组超时。
推荐阅读
- python - conda install conda 在全新安装时发现冲突
- python - 蟒蛇基维。[8620] 无法执行脚本 main。运行 exe 文件时,作为 .py 完美运行
- javascript - pdf.js-extractor - pdf 文件未正确解析
- android - 水平交换/翻转(镜像)整个设备屏幕的内容
- r - 如何自动为多个组添加/分配颜色
- python - 执行解压算法但得到“key error: 0”
- java - 我的接口的 java.lang.ClassNotFoundException
- spring - Spring Boot REST 项目 - JAXB 和继承问题(在非 Spring Boot 版本中工作)
- python - Sqlalchemy 1.4 创建具有无关系字段的模型
- python - $ pip2.7 损坏的 sys.stderr.write(f"ERROR: {exc}") 在 Mac 上