java - Flink:找不到指定执行的兼容工厂。目标(=本地)
问题描述
我最近从 1.9.0 更新到 flink 1.10.0 并在尝试在本地执行作业时开始出现此错误。令人惊讶的是,它在 IDE 中运行良好。只有当我尝试从命令行(java -jar)运行可执行 jar 时,才会收到此错误。
这里它说要添加一个依赖项,但我已经有了。有什么想法吗?
供参考,我有:
"org.apache.flink" %% "flink-scala" "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
-
Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1726)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:667)
at workflow.task.engineTask.DERFTask.execute(DERFTask.scala:146)
编辑:我做了一些调试,从命令行(作为 jar)运行时,“迭代器工厂”似乎是空的,而它不是来自 IDE。因此它永远不会进入while循环。诡异的..
public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
Preconditions.checkNotNull(configuration);
List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
Iterator factories = defaultLoader.iterator();
while(factories.hasNext()) {
try {
PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable var5) {
if (!(var5.getCause() instanceof NoClassDefFoundError)) {
throw var5;
}
LOG.info("Could not load factory due to missing dependencies.");
}
}
if (compatibleFactories.size() > 1) {
String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
return (String)e.getKey() + "=" + (String)e.getValue();
}).collect(Collectors.joining("\n"));
throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
} else {
return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
}
解决方案
我做了一些调试,从命令行(作为 jar)运行时,“迭代器工厂”似乎是空的,而它不是来自 IDE。因此它永远不会进入while循环。诡异的..
public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
Preconditions.checkNotNull(configuration);
List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
Iterator factories = defaultLoader.iterator();
while(factories.hasNext()) {
try {
PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable var5) {
if (!(var5.getCause() instanceof NoClassDefFoundError)) {
throw var5;
}
LOG.info("Could not load factory due to missing dependencies.");
}
}
if (compatibleFactories.size() > 1) {
String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
return (String)e.getKey() + "=" + (String)e.getValue();
}).collect(Collectors.joining("\n"));
throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
} else {
return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
}
推荐阅读
- java - JADE 是否可以拥有一个属于两个或多个容器的代理?
- python - Django 在保存时将数据更改为元组
- reactjs - Gulp 和 React 要求未定义且 browserify 无法正常工作
- excel - Excel VBA - 为什么工作表代码无法识别模块中定义的全局范围变量?
- java - 将 int 转换为 float 并返回不给出初始值
- javascript - 输入 [文本] 默认小写,但可以转换为大写
- java - 当函数作为方法参数传递时,如何测试函数的代码?
- tfs - TFS 查询以关闭 EPIC,但未关闭任何相关功能或用户故事或任务
- azure - 如何使用 azure 资源管理器模板在 linux 的虚拟机规模集上创建自定义脚本扩展?
- scikit-learn - 设置 hyperopt-sklearn 的评分方法