首页 > 解决方案 > 是否可以在 Apache Flink 中使仅地图任务并行执行

问题描述

我正在使用 Flink 处理一些 JSON 格式的流数据:

{"uuid":"903493290432934", "bin": "68.3"}
{"uuid":"324938722984237", "bin": "56.8"}
...

我的工作很简单:

从数据源获取流 ---> 将数据反序列化为字符串 ---> 将字符串转换为 JSON 对象myJsonObj---> double res = myJsonObj.get("bin")---> 使用res.

这是我的代码:

FlinkPravegaReader<String> source = ... // init source
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// transform String to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
    .map(new MapFunction<String, MyJson>() {
        @Override
        public MyJson map(String s) throws Exception {
            MyJson myJson = JSON.parseObject(s, MyJson.class);
            return myJson;
        }
    });
// do the heavy process
DataStream<String> heavyResult = jsonStream
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            double res = myJson.get("bin");
            // do some very heavy calculation
            return myJson.get("uuid").asText() + " done.";
        }
    });
heavyResult.print();

据我了解,我没有使用任何keyBy/window,所以我认为我windowAll默认使用了。我对吗?

如果我是对的,Flink 的文档告诉我windowAll不能以并行方式运行。那么是不是意味着我必须一一进行繁重的计算呢?我在想是否可以并行进行繁重的计算。

如您所见,就我而言,使用似乎没有keyBy/window任何意义。那么如何让这个案例并行执行呢?是否可以使两个作业与以下相同的数据源一起运行?

             /----windowAll ---- do the heavy calculation
            /
Data Source-
            \
             \----windowAll ---- do the heavy calculation

这种设计可行吗?假设数据源生成三个元素:A 和 B。通过这种设计,我期望一个 windowAll 处理 A,而另一个 windowAll 处理 B。

标签: parallel-processingapache-flinkflink-streaming

解决方案


密钥流用于在数据中创建分区,因此来自同一密钥的所有流量都将发送到同一任务管理器。

当您想要聚合流中的元素以出于给定原因将它们计算为一个集合时,将使用窗口。

如果您的案例不适合上述案例,则不要使用它们。

要为整个流提供并行性,只需使用

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);  //Notice you'll need 3 taskmanagers slots available.

要为单个运算符(大量计算)定义并行性,请使用:

DataStream<String> heavyResult = jsonStream
.map(new MapFunction<MyJson, String>() {
    @Override
    public String map(MyJson myJson) throws Exception {
        double res = myJson.get("bin");
        // do some very heavy calculation
        return myJson.get("uuid").asText() + " done.";
    }
}).setParallelism(3);  //Notice you'll need 3 taskmanagers slots available.

更多信息请访问https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/parallel.html


推荐阅读