java - Flink 的 QueryableStateClient 抛出 'FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85'
问题描述
我正在尝试使用其 QueryableStateClient API 在 Flink 作业中持久化的自定义对象上查询 ValueState。我可以观察到持久性工作正常,但是来自客户端应用程序的查询会抛出FlinkRuntimeException
.
以下是客户端应用程序中使用的代码片段:
ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",
TypeInformation.of(new TypeHint<>() {}).createSerializer(new ExecutionConfig()));
QueryableStateClient client = new QueryableStateClient(proxyHostName, proxyPort);
CompletableFuture<ValueState<CustomObj>> res =client.getKvState(jobId, "state-query", key,
BasicTypeInfo.STRING_TYPE_INFO, descriptor);
当我执行 ares.get()
时,以下是我得到的错误:
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
... 134 common frames omitted
Caused by: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
at org.apache.flink.queryablestate.client.QueryableStateClient.createState(QueryableStateClient.java:289)
at org.apache.flink.queryablestate.client.QueryableStateClient.lambda$getKvState$2(QueryableStateClient.java:274)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:326)
flink 作业中用于保持状态的代码片段:
ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",
TypeInformation.of(new TypeHint<>() {}));
descriptor.setQueryable("state-query");
state = getRuntimeContext().getState(descriptor);
我也尝试Types.GENERIC(CustomObj.class)
过使用描述符,但没有用。仅供参考,我CustomObj
在我的 flink 工作和客户端应用程序中都使用了我的精确副本。如果我做错了什么,请突出显示。
解决方案
推荐阅读
- oracle - PL SQL中需要同时在不同会话中运行的不同SP写入单个日志文件
- python - python 编程出错 xml.etree.ElementTree.ParseError: not well-formed
- python - Odoo 电子邮件模板呈现错误
- sql-server - SQL Server 2014 按半小时间隔聚合事件持续时间
- jquery - 如何在 Web 应用程序 chrome.18n 中设计语言切换器
- python - 使用 pyserial 以 64 字节为增量在循环中发送起始地址
- python - json 文件模式/对象以触发模式以加载数据帧
- java - 为什么 Collections.min(Arrays.asList(new Base(), new Base())) 给出一个奇怪的编译错误(Eclipse)?
- javascript - 使用 webpack 构建的旧 Backbone 和 Marionette
- node.js - 在 Axios/Node 中调用正确的 get 路由