首页 > 解决方案 > 如何在 Apache Flink 中拆分 NodeObject 的数据

问题描述

我正在使用 Flink 处理来自某些数据源(如 Kafka、Pravega 等)的数据。

就我而言,数据源是 Pravega,它为我提供了一个 flink 连接器。

我的数据源正在向我发送一些 JSON 数据,如下所示:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

这是我的一段代码:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

如您所见,我使用了FlinkPravegaReader和 一个适当的反序列化器来获取来自 Pravega 的 JSON 流。

然后我尝试将 JSON 数据转换为字符串,KeyBy然后对它们进行计数。

但是,我收到一个错误:

 The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

似乎KeyBy抛出了这个异常。

好吧,我不是 Flink 专家,所以我不知道为什么。我已经阅读了官方示例的源代码WordCount。在该示例中,有一个自定义拆分器,用于将字符串数据拆分为单词。

所以我在想在这种情况下是否也需要使用某种分离器?如果是这样,我应该使用什么样的分离器?你能给我举个例子吗?如果没有,为什么会出现这样的错误以及如何解决?

标签: jsonapache-flinkflink-streaming

解决方案


我猜你已经阅读了关于如何指定密钥的文档

指定键

示例代码使用keyby("word")becauseword是 POJO 类型的字段WC

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

在您的情况下,您在map之前放置了一个运算符keyBy,该运算符的输出map是 a string。因此,您的情况显然没有word字段。如果你真的想对这个string流进行分组,你需要这样写.keyBy(String::toString)

或者您甚至可以实现自定义keySelector以生成您自己的key.

自定义键选择器


推荐阅读