apache-flink - 反应模式需要 Flink Adaptive Scheduler
问题描述
我在测试基于 Kubernetes 的应用程序模型时遇到了问题。这是JM的日志,如下所示。有人可以检查一下吗?
java.lang.IllegalStateException: Adaptive Scheduler is required for reactive mode
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory.createJobManagerRunner(JobMasterServiceLeadershipRunnerFactory.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:468) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:399) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$27(Dispatcher.java:954) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_292]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_292]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_292]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
2021-07-09 03:17:32,667 WARN org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
JM的定义
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
imagePullSecrets:
- name: artifactory-container-registry
restartPolicy: OnFailure
containers:
- name: jobmanager
image: txo-dswim-esb-docker-local.artifactory.swg-devops.com/diak8scluster/flink-hadoop-app:fvt
imagePullPolicy: Always
#args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount", <optional arguments>, <job arguments>]
args: ["standalone-job", "--job-classname", "BatchWordCount", <optional arguments>, <job arguments>]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 0 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
码头工人
FROM maven:3.6-jdk-8-slim AS builder
# get explore-flink job and compile it
WORKDIR /opt/explore-flink
COPY ./flinktest /opt/explore-flink
RUN mvn clean install
FROM flink:1.13.1-scala_2.12
WORKDIR $FLINK_HOME
RUN mkdir -p $FLINK_HOME/usrlib
COPY --from=builder --chown=flink:flink /opt/explore-flink/target/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
#Entropy injection for S3 file systems
RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop \
&& mkdir $FLINK_HOME/plugins/s3-fs-presto
COPY flink-s3-fs-hadoop-1.13.1.jar $FLINK_HOME/plugins/s3-fs-hadoop
COPY flink-s3-fs-presto-1.13.1.jar $FLINK_HOME/plugins/s3-fs-presto
RUN chown -R flink:flink .;
已检查文件夹下有此 jar 的图像/opt/flink/usrlib
flink@38374be9bdf1:~/usrlib$ ls -lrt
total 85660
-rw-r--r-- 1 flink flink 87712073 Jul 6 10:41 flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
flink@38374be9bdf1:~/用户库$
Java 类
/**
* Count each word when provide the source.
*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements("Flink batch demo", "batch demo", "demo");
DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
ds.print();
}
static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word,1) );
}
}
}
}
解决方案
推荐阅读
- powershell - PowerShell 模块部署
- css - 如何在SVG中设置具有绝对尺寸相对定位的元素?
- ios - 如何在针对旧 iOS 版本时添加需要 iOS 12 的自定义 Intent?
- python - 为什么大多数编程语言在换行符之前使用逗号,而不是在换行符之后?
- laravel - 在 PhpStorm 中远程管理 Laravel 项目
- jquery - 带有子页面的锚链接
- php - 如何使用“for”循环 PHP 在 PHP 中创建多个变量,
- javascript - 将 getCurrentPosition() 值推送到数组中,但无法控制数组的日志元素
- python - Python List - 对整数进行排序,然后对字符串进行排序
- html - 打开一个指向新选项卡的链接,然后打开另一个链接到刚刚打开的同一个选项卡