首页 > 解决方案 > Flink:找不到指定执行的兼容工厂。目标(=本地)

问题描述

我最近从 1.9.0 更新到 flink 1.10.0 并在尝试在本地执行作业时开始出现此错误。令人惊讶的是,它在 IDE 中运行良好。只有当我尝试从命令行(java -jar)运行可执行 jar 时,才会收到此错误。

这里它说要添加一个依赖项,但我已经有了。有什么想法吗?

供参考,我有:

"org.apache.flink" %% "flink-scala" "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",

-

Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1726)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:667)
at workflow.task.engineTask.DERFTask.execute(DERFTask.scala:146)

编辑:我做了一些调试,从命令行(作为 jar)运行时,“迭代器工厂”似乎是空的,而它不是来自 IDE。因此它永远不会进入while循环。诡异的..

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
    Preconditions.checkNotNull(configuration);
    List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
    Iterator factories = defaultLoader.iterator();

    while(factories.hasNext()) {
        try {
            PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable var5) {
            if (!(var5.getCause() instanceof NoClassDefFoundError)) {
                throw var5;
            }

            LOG.info("Could not load factory due to missing dependencies.");
        }
    }

    if (compatibleFactories.size() > 1) {
        String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
            return (String)e.getKey() + "=" + (String)e.getValue();
        }).collect(Collectors.joining("\n"));
        throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
    } else {
        return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
    }

标签: javascalaapache-flinkflink-streaming

解决方案


我做了一些调试,从命令行(作为 jar)运行时,“迭代器工厂”似乎是空的,而它不是来自 IDE。因此它永远不会进入while循环。诡异的..

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
    Preconditions.checkNotNull(configuration);
    List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
    Iterator factories = defaultLoader.iterator();

    while(factories.hasNext()) {
        try {
            PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable var5) {
            if (!(var5.getCause() instanceof NoClassDefFoundError)) {
                throw var5;
            }

            LOG.info("Could not load factory due to missing dependencies.");
        }
    }

    if (compatibleFactories.size() > 1) {
        String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
            return (String)e.getKey() + "=" + (String)e.getValue();
        }).collect(Collectors.joining("\n"));
        throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
    } else {
        return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
    }

推荐阅读