java - Why is my Apache Storm 2.0 topology restarted after 30s?
问题描述
I've tried with several configuration parameters and even using withLocalModeOverride
with no luck. What I'm missing here?
Here's a sample application, after 30s the counter is reset and everything is started over. Please let me know if I can provide additional details.
Usage:
mvn package
java -jar target/Test-0.1.0.jar
src/main/java/Test.java
:
package storm.test;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
class Test {
private static class Spout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
private long n;
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
LOG.error("InfiniteSpout::nextTuple {}", n);
spoutOutputCollector.emit(new Values(n++));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("x"));
}
}
private static class Bolt extends BaseRichBolt {
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
@Override
public void execute(Tuple tuple) {
Long x = tuple.getLongByField("x");
LOG.error("Bolt::execute {}", x);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
private static final Logger LOG = LoggerFactory.getLogger(Test.class);
public static void main(String[] args) {
try {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new Spout());
builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
StormTopology topology = builder.createTopology();
cluster.submitTopology("test", conf, topology);
} catch (Exception e) {
LOG.error(e.getMessage());
}
}
}
pom.xml
:
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>Test</groupId>
<artifactId>Test</artifactId>
<version>0.1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>Test</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
解决方案
Here's the actual issue:
This is https://issues.apache.org/jira/browse/STORM-3501. This issue only affects running in LocalClusters, and then only if you don't have a resources directory in the jar you generate.
You can work around this by adding a resources directory to your jar. With your pom, you want to add a src/main/resources/resources directory.
Regarding things to consider when running single-node Storm, I think you should think hard about whether Storm is the right choice for your use case. Storm is reasonably complex, and much of the complexity is because we want it to be able to distribute computation across many physical machines. If you're going to run all your computation on one machine, you might not really be gaining much by using Storm over e.g. just writing a regular Java application, or using something like Apache Camel.
Other things to consider when running single-node:
Storm is fail-fast, so if you get any errors, the entire worker will crash. As you are running on a single machine, you might take down a significant fraction of your cluster (default is 4 workers per machine, so you'll lose a quarter of your state any time an error occurs).
Don't use LocalCluster for production workloads, it isn't designed for it. Set up a real Storm install, and then just run that on one machine.
Here's some stuff that jumps out at me, maybe some of it will help:
You need to add a sleep after cluster.submitTopology, or your program should just quit immediately. That call doesn't block, it just submits the topology to the LocalCluster, then returns. When your main method quits, the LocalCluster will likely also shut down. In a "real" setup, you would be submitting to a cluster that runs as a separate process, so this wouldn't be an issue, but when using LocalCluster, you need to make the main thread wait until you want to close the program.
Just in case you end up using similar code in a test, you should remember to close the LocalCluster when you're done. It's autocloseable, so you can just put it in a try.
It's good practice to ack tuples in bolts. Consider extending BaseBasicBolt if you just want to ack when your bolt is done with a tuple.
推荐阅读
- electron - 是否可以允许外部程序将 Electron 子进程分解为自己的进程?
- php - PHP问题无法发布
- java - 在 Java 中获取上传文件作者
- python - 如何使用 hdfscli python 库?
- python - 如何保存神经网络中每个循环的结果
- python - Python+Sqlite3,如何使用“索引”更新表
- html - 3D 转换元素上的 CSS 模糊过滤器
- google-apps-script - 谷歌表格 onEdit 大写脚本功能
- terraform - 如何配置 AWS 网络负载均衡器以使用 Terraform 保留客户端 IP 地址?
- java - 有界类型作为方法参数?