首页 > 解决方案 > Flink 的 QueryableStateClient 抛出 'FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85'

问题描述

我正在尝试使用其 QueryableStateClient API 在 Flink 作业中持久化的自定义对象上查询 ValueState。我可以观察到持久性工作正常,但是来自客户端应用程序的查询会抛出FlinkRuntimeException.

以下是客户端应用程序中使用的代码片段:

ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",
TypeInformation.of(new TypeHint<>() {}).createSerializer(new ExecutionConfig()));

QueryableStateClient client = new QueryableStateClient(proxyHostName, proxyPort);
CompletableFuture<ValueState<CustomObj>> res =client.getKvState(jobId, "state-query", key,
BasicTypeInfo.STRING_TYPE_INFO, descriptor);

当我执行 ares.get()时,以下是我得到的错误:

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    ... 134 common frames omitted
Caused by: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
    at org.apache.flink.queryablestate.client.QueryableStateClient.createState(QueryableStateClient.java:289)
    at org.apache.flink.queryablestate.client.QueryableStateClient.lambda$getKvState$2(QueryableStateClient.java:274)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:326)

flink 作业中用于保持状态的代码片段:

ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",
TypeInformation.of(new TypeHint<>() {}));
descriptor.setQueryable("state-query");
state = getRuntimeContext().getState(descriptor);

我也尝试Types.GENERIC(CustomObj.class)过使用描述符,但没有用。仅供参考,我CustomObj在我的 flink 工作和客户端应用程序中都使用了我的精确副本。如果我做错了什么,请突出显示。

标签: javaapache-flinkflink-streamingflink-cep

解决方案


推荐阅读