java - 多次执行prepare方法
问题描述
嗨,我正在使用 apache-storm 创建一个拓扑,其中我的 Spout 正在从 Kakfa 主题收集数据并将其发送到螺栓。
我正在对元组进行一些验证,并再次为其他螺栓发出流。
现在的问题是,我的第二个使用第一个螺栓流的螺栓有一个重载方法prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector)
,该方法在每 2 秒后执行一次。
拓扑代码是
topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);
topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM);
第一个螺栓“abc”的代码是
@Override
public void execute(Tuple tuple) {
String document = String.valueOf(tuple.getValue(4));
if (Utils.isJSONValid(document)) {
outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));
}
当我搜索时,我发现
The prepare method is called when the bolt is initialised and is
similar to the open method in spout. It is called only once for the bolt.
It gets the configuration for the bolt and also the context of the bolt.
The collector is used to emit or output the tuples from this bolt.
链接到公共要点以获取日志 Storm 拓扑日志
解决方案
您的日志显示您正在使用 LocalCluster。它是一个测试/演示工具,请勿将其用于生产工作负载。而是建立一个真正的分布式集群。
关于正在发生的事情:
当您在 LocalCluster 中运行拓扑时,Storm 通过将所有组件(Nimbus、Supervisor 和 worker)作为单个 JVM 中的线程运行来模拟一个真实的集群。您的日志显示以下几行:
20:14:12.451 [SLOT_1027] 信息 oasProcessSimulator - 开始杀死进程 2ea97301-24c9-4c1a-bcba-61008693971a
20:14:12.451 [SLOT_1027] 信息 oasdwWorker - 关闭工作者 smart-transactional-data-1-1566571315 72bbf510-c342-4385-9599-0821a2dee94e 1027
20:14:15.518 [SLOT_1027] 信息 oasdsSlot - 运行 msInState 的状态:33328 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a -> kill-blob-update msInState: 3001 topo:智能交易数据 1-1566571315 工人:2ea97301-24c9-4c1a-bcba-61008693971a
20:14:15.540 [SLOT_1027] 信息 oasdwWorker - 为 smart-transactional-data-1-1566571315 启动工作人员
LocalCluster 正在关闭其中一个模拟工作者,因为其中一个 blob(例如拓扑 jar、拓扑配置、其他类型的共享文件,请参阅https://storm.apache.org/releases/2.0.0/distcache -blobstore.html ) 中的 blobstore 已更改。通常,当这种情况发生在真正的集群中时,worker JVM 将被杀死,blob 将被更新并且worker 将重新启动。由于您使用的是 LocalCluster,它只会杀死工作线程并重新启动它。这就是为什么您会看到多次调用prepare
.
推荐阅读
- javascript - append() 替代 jQuery
- javascript - 箭头函数过滤器
- python - 如何在 Python 3 中通过将字典从一个模块传递到另一个模块来运行带有 os.sys 和子进程的脚本
- createjs - 在 Createjs 中比较两个形状
- html - 如何在两个表之间添加空格
- vue.js - 为什么在 IIS 上启用 vue.js 历史模式时,文件下载会重定向到主页
- python-3.x - 如何使用python比较从行中间的每一行与单个文本文件中的每一行?
- python-3.x - Python: subprocess.call('nvm ls', shell=True) 给出这个错误 /bin/sh: nvm: command not found
- list - 如何将数据数组转换为在颤振/飞镖中展开或折叠的小部件列表?
- visual-studio-2017 - 从 Visual Studio 2017 Pro 迁移到社区版