scala - 如何查询flink的可查询状态
问题描述
我正在使用 flink 1.8.0 并且正在尝试查询我的工作状态。
val descriptor = new ValueStateDescriptor("myState", Types.CASE_CLASS[Foo])
descriptor.setQueryable("my-queryable-State")
我使用端口 9067这是默认端口,我的客户:
val client = new QueryableStateClient("127.0.0.1", 9067)
val jobId = JobID.fromHexString("d48a6c980d1a147e0622565700158d9e")
val execConfig = new ExecutionConfig
val descriptor = new ValueStateDescriptor("my-queryable-State", Types.CASE_CLASS[Foo])
val res: Future[ValueState[Foo]] = client.getKvState(jobId, "my-queryable-State","a", BasicTypeInfo.STRING_TYPE_INFO, descriptor)
res.map(_.toString).pipeTo(sender)
但我得到:
[ERROR] [06/25/2019 20:37:05.499] [bvAkkaHttpServer-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(bvAkkaHttpServer)] Error during processing of request: 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067
- 我究竟做错了什么 ?
- 我应该如何以及在哪里定义
QueryableStateOptions
解决方案
所以如果你想使用QueryableState
你需要将正确的 Jar 添加到你的 flink 中。jar 是flink-queryable-state-runtime
,它可以opt
在您的 flink 发行版中的文件夹中找到,您应该将它移到该lib
文件夹中。
至于第二个问题,QueryableStateOption
它只是一个用于创建静态ConfigOption
定义的类。然后使用这些定义从flink-conf.yaml
文件中读取配置。所以目前唯一的配置选项QueryableState
是使用 flink 发行版中的 flink-conf 文件。
编辑:另外,请尝试阅读此] 1它提供了有关可查询状态如何工作的更多信息。您不应该真正直接连接到服务器端口,而是应该使用代理端口,默认情况下是9069
.
推荐阅读
- android - LazyColumn 中的 Jetpack Compose 常规项目
- javascript - 使用管道和地图创建过滤器
- javascript - 正则表达式用除以 2 替换 "\n" 出现
- javascript - JavaScript Concat Unicode
- git - 如何从 GitHub 的主存储库中删除特定的分叉本地克隆?
- python - 如何使用烧瓶 mongoengine 中的引用填充查询?
- pandas - 如何将 pandas 多索引转换为表列和索引?
- python - 将 Docker 与 Python 单元测试相结合的最佳实践是什么?
- swift - SKPhysicsContact 没有用子弹接触敌人
- networking - 确定网络中的往返时间值