json - 如何在 Apache Flink 中拆分 NodeObject 的数据
问题描述
我正在使用 Flink 处理来自某些数据源(如 Kafka、Pravega 等)的数据。
就我而言,数据源是 Pravega,它为我提供了一个 flink 连接器。
我的数据源正在向我发送一些 JSON 数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
这是我的一段代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用了FlinkPravegaReader
和 一个适当的反序列化器来获取来自 Pravega 的 JSON 流。
然后我尝试将 JSON 数据转换为字符串,KeyBy
然后对它们进行计数。
但是,我收到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎KeyBy
抛出了这个异常。
好吧,我不是 Flink 专家,所以我不知道为什么。我已经阅读了官方示例的源代码WordCount
。在该示例中,有一个自定义拆分器,用于将字符串数据拆分为单词。
所以我在想在这种情况下是否也需要使用某种分离器?如果是这样,我应该使用什么样的分离器?你能给我举个例子吗?如果没有,为什么会出现这样的错误以及如何解决?
解决方案
我猜你已经阅读了关于如何指定密钥的文档
示例代码使用keyby("word")
becauseword
是 POJO 类型的字段WC
。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
在您的情况下,您在map
之前放置了一个运算符keyBy
,该运算符的输出map
是 a string
。因此,您的情况显然没有word
字段。如果你真的想对这个string
流进行分组,你需要这样写.keyBy(String::toString)
或者您甚至可以实现自定义keySelector
以生成您自己的key
.
推荐阅读
- c# - C#: O(n²) solution to finding n/k elements of an array of of size n and a number k
- macos - SSHFS 挂载远程 ssh 服务器导致分段错误:Mac OS 上的 11
- jquery - 使用jquery单击单选按钮时取消选中它本身
- css - 根据 body 类更改 sass 变量
- javascript - 那么如何将数据推送到承诺中的数组?
- wordpress - 我对持久性页面链接有疑问
- aws-api-gateway - Terraform - 如何启用 API Gateway 执行日志记录?
- python - 是否有一种相对可靠的方法来检测当前脚本是否以 PYTHONSTARTUP 运行?
- java - Java 一个用于泊松分布的函数以获得分位数
- c# - C#中的Pramp反句面试题