spring-boot - 我的风暴螺栓无法在集群模式下反序列化
问题描述
我用springboot和storm做demo,在本地模式下工作,但是在集群模式下提交jar报错
./storm jar storm-demo3-0.0.1-SNAPSHOT.jar org.springframework.boot.loader.JarLauncher simpleBoot
当我使用 maven-compiler-plugin 移动 springBoot 并打包时,它可以正常工作
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
这是主管的错误
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.fosung.share.stormdemo3.bolt.FilterBolt
at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:259) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.utils.Utils.getSetComponentObject(Utils.java:507) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.task$get_task_object.invoke(task.clj:76) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.task$mk_task_data$fn__6524.invoke(task.clj:180) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$assoc_apply_self.invoke(util.clj:931) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.task$mk_task_data.invoke(task.clj:172) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.task$mk_task.invoke(task.clj:184) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$mk_executor$fn__10662.invoke(executor.clj:379) ~[storm-core-1.2.2.jar:1.2.2]
at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.7.0.jar:?]
at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?]
at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) ~[clojure-1.7.0.jar:?]
at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$mk_executor.invoke(executor.clj:380) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.worker$fn__11300$exec_fn__2470__auto__$reify__11302$iter__11307__11311$fn__11312.invoke(worker.clj:663) ~[storm-core-1.2.2.jar:1.2.2]
at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
at clojure.core$dorun.invoke(core.clj:3009) ~[clojure-1.7.0.jar:?]
at clojure.core$doall.invoke(core.clj:3025) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__11300$exec_fn__2470__auto__$reify__11302.run(worker.clj:663) ~[storm-core-1.2.2.jar:1.2.2]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_152]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_152]
at org.apache.storm.daemon.worker$fn__11300$exec_fn__2470__auto____11301.invoke(worker.clj:633) ~[storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.applyToHelper(AFn.java:178) ~[clojure-1.7.0.jar:?]
at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.7.0.jar:?]
at clojure.core$apply.invoke(core.clj:630) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__11300$mk_worker__11391.doInvoke(worker.clj:605) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$_main.invoke(worker.clj:798) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.applyToHelper(AFn.java:165) [clojure-1.7.0.jar:?]
at clojure.lang.AFn.applyTo(AFn.java:144) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker.main(Unknown Source) [storm-core-1.2.2.jar:1.2.2]
Caused by: java.lang.ClassNotFoundException: com.fosung.share.stormdemo3.bolt.FilterBolt
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_152]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_152]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) ~[?:1.8.0_152]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_152]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_152]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_152]
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:683) ~[?:1.8.0_152]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1863) ~[?:1.8.0_152]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746) ~[?:1.8.0_152]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2037) ~[?:1.8.0_152]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_152]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_152]
at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253) ~[storm-core-1.2.2.jar:1.2.2]
... 38 more
2019-05-22 11:09:14.684 o.a.s.util main [ERROR] Halting process: ("Error on initialization")
java.lang.RuntimeException: ("Error on initialization")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__11300$mk_worker__11391.doInvoke(worker.clj:605) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$_main.invoke(worker.clj:798) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.applyToHelper(AFn.java:165) [clojure-1.7.0.jar:?]
at clojure.lang.AFn.applyTo(AFn.java:144) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker.main(Unknown Source) [storm-core-1.2.2.jar:1.2.2]
我的 pom.xml
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>ring-cors</artifactId>
<groupId>ring-cors</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
我的拓扑
public class MyTopology {
public static void main(String[] args) {
System.out.println("MyTopology main start");
// 定义一个拓扑
TopologyBuilder builder = new TopologyBuilder();
// 设置1个Executeor(线程),默认一个
DataSpout dataSpout = new DataSpout();
builder.setSpout("spoutId", dataSpout);
// shuffleGrouping:表示是随机分组
// 设置1个Executeor(线程),和两个task
FilterBolt filterBolt = new FilterBolt();
InsertBolt insertBolt = new InsertBolt();
builder.setBolt("filterBolt", filterBolt).setNumTasks(1).allGrouping("spoutId", "spoutId");
builder.setBolt("insertBolt", insertBolt).setNumTasks(1).allGrouping("filterBolt", "spoutId");
Config conf = new Config();
try {
// 有参数时,表示向集群提交作业,并把第一个参数当做topology名称
// 没有参数时,本地提交
if (args != null && args.length > 0) {
System.out.println("运行远程模式");
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
// 启动本地模式
System.out.println("运行本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf, builder.createTopology());
}
} catch (Exception e) {
System.out.println("storm启动失败!程序退出!");
System.exit(1);
e.printStackTrace();
}
// System.out.println("storm启动成功...");
}
}
我的喷口
public class DataSpout extends BaseRichSpout {
SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
System.out.println("spout open");
}
@Override
public void nextTuple() {
/*try {
Thread.sleep(1000);
return;
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("spout nextTuple start");
int rndomn = (int)Math.random() * 1000;
collector.emit("spoutId", new Values(rndomn));
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("spoutId", new Fields("spoutId"));
}
}
我的螺栓
public class FilterBolt extends BaseRichBolt {
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
System.out.println("filter bolt start");
Integer o = (Integer) input.getValues().get(0);
if (o>10){
collector.emit("spoutId", new Values(o));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//定义下个bolt接收streamId
declarer.declareStream("spoutId", new Fields("spoutId"));
}
}
解决方案
Spring(引导)不适合 Storm。Storm 是一个框架,这意味着它负责管理一些类的生命周期,比如你的 bolt。由于 Storm 对 Spring 一无所知,因此 Spring 的依赖注入不能开箱即用。可以将 Spring 设置为在 Storm 应用程序的某些部分上工作,例如任务和工作挂钩,这可以让您在 Storm 工作人员中创建 Spring 上下文。我不认为我会推荐它,除非你有充分的理由需要 Spring。
关于您遇到的错误,Storm 未能在您提交的 jar 中找到您的类之一。由于您没有为 Spring 配置发布 pom.xml,因此很难说,但也许您正在使用一个插件来移动您的类。当您向 Storm 提交拓扑时,Storm 会运行几个您应该了解的阶段:
首先你做storm jar com.yourcompany.yourMain
。这将在您的本地机器(或运行命令的任何地方)上启动一个 JVM,它运行您的拓扑设置,在您的情况下为MyTopology.main
. 然后设置将你的 spout 和 bolt 序列化,并将 jar 和序列化拓扑发送到 Nimbus(一个单独的 JVM),然后将其发送给主管(另一个单独的 JVM)。在监督者上,监督者 JVM 会启动许多工作 JVM 来运行您的拓扑。每个工作 JVM 都以类似java -cp your-topology.jar org.apache.storm.Worker
. worker JVM 加载序列化的拓扑,以及拓扑 jar 中的类,并启动线程以运行 spout 和 bolt。
这些阶段很可能是它失败的原因。当您运行拓扑设置代码时,您使用的是 Spring Boot 命令,因此 Spring Boot 有机会运行。当拓扑在工作机器上启动时,JVM 会通过对非 Spring main 方法的常规旧调用启动,因此 Spring 没有机会运行。
如果您决定不使用 Spring,您可以在此处找到一个工作示例 POM 。
推荐阅读
- collision - 如果重叠则弹出阻塞对象:UE4
- javascript - composer 需要 dompdf/dompdf 不安装
- docker - 如何在 docker-compose v3 上定义静态 ip
- java - 调用 fcm.googleapis.com/fcm/send 时连接关闭
- python - 在 Python 中查找字符串的所有字典顺序的替代方法
- jenkins - 在没有从站的远程主机上执行 Jenkins 管道
- python - Numpy fromfunction 返回错误:用作索引的数组必须是整数(或布尔)类型
- android - 'SharedPreferencesUtils()' 在 'com.google.android.gms.common.util.SharedPreferencesUtils 中具有私有访问权限
- java - 如何证明String是否以int结尾?
- angular - 后台模式 plugin_not_installed ,甚至后台模式插件已经安装在 ionic 项目中