首页 > 解决方案 > 反应模式需要 Flink Adaptive Scheduler

问题描述

我在测试基于 Kubernetes 的应用程序模型时遇到了问题。这是JM的日志,如下所示。有人可以检查一下吗?

java.lang.IllegalStateException: Adaptive Scheduler is required for reactive mode
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory.createJobManagerRunner(JobMasterServiceLeadershipRunnerFactory.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:468) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:399) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
2021-07-09 03:17:32,667 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.

JM的定义

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      imagePullSecrets:
        - name: artifactory-container-registry
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: txo-dswim-esb-docker-local.artifactory.swg-devops.com/diak8scluster/flink-hadoop-app:fvt
          imagePullPolicy: Always
          #args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount", <optional arguments>, <job arguments>]
          args: ["standalone-job", "--job-classname", "BatchWordCount", <optional arguments>, <job arguments>]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 0  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

码头工人

FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
WORKDIR /opt/explore-flink
COPY ./flinktest /opt/explore-flink
RUN mvn clean install

FROM flink:1.13.1-scala_2.12

WORKDIR $FLINK_HOME
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=builder --chown=flink:flink /opt/explore-flink/target/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar

#Entropy injection for S3 file systems
RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop \
    && mkdir $FLINK_HOME/plugins/s3-fs-presto 

COPY flink-s3-fs-hadoop-1.13.1.jar $FLINK_HOME/plugins/s3-fs-hadoop
COPY flink-s3-fs-presto-1.13.1.jar $FLINK_HOME/plugins/s3-fs-presto
RUN chown -R flink:flink .;

已检查文件夹下有此 jar 的图像/opt/flink/usrlib

flink@38374be9bdf1:~/usrlib$ ls -lrt
total 85660
-rw-r--r-- 1 flink flink 87712073 Jul 6 10:41 flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink@38374be9bdf1:~/用户库$

Java 类

/**
 *  Count each word when provide the source.
 */

public class BatchWordCount {



    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.fromElements("Flink batch demo", "batch demo", "demo");
        DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
        ds.print();
    }

    static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

            for (String word : line.split(" ")) {
                collector.collect(new Tuple2<String, Integer>(word,1) );
            }
        }
    }
}

标签: apache-flink

解决方案


推荐阅读