首页 > 解决方案 > Why is my Apache Storm 2.0 topology restarted after 30s?

问题描述

I've tried with several configuration parameters and even using withLocalModeOverride with no luck. What I'm missing here?

Here's a sample application, after 30s the counter is reset and everything is started over. Please let me know if I can provide additional details.

Usage:

mvn package
java -jar target/Test-0.1.0.jar

src/main/java/Test.java:

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

class Test {
    private static class Spout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        private long n;

        @Override
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        @Override
        public void nextTuple() {
            LOG.error("InfiniteSpout::nextTuple {}", n);
            spoutOutputCollector.emit(new Values(n++));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("x"));
        }
    }

    private static class Bolt extends BaseRichBolt {
        @Override
        public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        @Override
        public void execute(Tuple tuple) {
            Long x = tuple.getLongByField("x");
            LOG.error("Bolt::execute {}", x);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) {
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new Spout());
            builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");

            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            StormTopology topology = builder.createTopology();
            cluster.submitTopology("test", conf, topology);
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }
}

pom.xml:

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>Test</groupId>
    <artifactId>Test</artifactId>
    <version>0.1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>Test</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

标签: javaapache-storm

解决方案


Here's the actual issue:

This is https://issues.apache.org/jira/browse/STORM-3501. This issue only affects running in LocalClusters, and then only if you don't have a resources directory in the jar you generate.

You can work around this by adding a resources directory to your jar. With your pom, you want to add a src/main/resources/resources directory.

Regarding things to consider when running single-node Storm, I think you should think hard about whether Storm is the right choice for your use case. Storm is reasonably complex, and much of the complexity is because we want it to be able to distribute computation across many physical machines. If you're going to run all your computation on one machine, you might not really be gaining much by using Storm over e.g. just writing a regular Java application, or using something like Apache Camel.

Other things to consider when running single-node:

  • Storm is fail-fast, so if you get any errors, the entire worker will crash. As you are running on a single machine, you might take down a significant fraction of your cluster (default is 4 workers per machine, so you'll lose a quarter of your state any time an error occurs).

  • Don't use LocalCluster for production workloads, it isn't designed for it. Set up a real Storm install, and then just run that on one machine.

Here's some stuff that jumps out at me, maybe some of it will help:

  • You need to add a sleep after cluster.submitTopology, or your program should just quit immediately. That call doesn't block, it just submits the topology to the LocalCluster, then returns. When your main method quits, the LocalCluster will likely also shut down. In a "real" setup, you would be submitting to a cluster that runs as a separate process, so this wouldn't be an issue, but when using LocalCluster, you need to make the main thread wait until you want to close the program.

  • Just in case you end up using similar code in a test, you should remember to close the LocalCluster when you're done. It's autocloseable, so you can just put it in a try.

  • It's good practice to ack tuples in bolts. Consider extending BaseBasicBolt if you just want to ack when your bolt is done with a tuple.


推荐阅读