首页 > 解决方案 > 如何为流以外的flink表设置并行度

问题描述

正如标题一样,当我只在我的流应用程序中使用 DataStream API 时,我使用了很多 setParallelism。

最近我发现 table API 更适合在我的场景中使用,因为它可以统一通常必须是 2 个不同的批处理/流应用程序,仅使用不同的数据集/数据流 api 节省大量重复代码。但是当我尝试将我的流应用程序移植到表 API 时。我发现 Table 不支持 setParallelism。

我必须通过这样的自定义 API 来做到这一点。

谁能帮我:

1、表不需要设置并行度,为什么?

2,如果仍然需要为表设置并行度,如何以另一种方式比我的乏味方式更好地实现这一点?

static public DataStream<Row> getRowStreamParallelismed(DataStream<Row> input, Integer parallelism) {
    return ((DataStreamSource<Row>) input)
            .setParallelism(parallelism)
            .map((MapFunction<Row, Row>) value -> {
                return value;
            })
            //.startNewChain()
            ;
}

static public Table getTableParallelismed(Table input, Integer parallelism, StreamTableEnvironment tableEnv) {
    RowTypeInfo rowTypeInfo = MyType.getRowTypeInfo(input);
    DataStream<Row> rowInput = tableEnv.toAppendStream(input, rowTypeInfo);
    DataStream<Row> parallelismed = MyStream.getRowStreamParallelismed(rowInput, parallelism);
    Table ret = tableEnv.fromDataStream(parallelismed, String.join(",", rowTypeInfo.getFieldNames()));
    return ret;
} 

标签: apache-flink

解决方案


您可以使用table.exec.resource.default-parallelism来设置并行度值。

如果您设置此配置参数,它将覆盖 StreamContext 并行度值。

// instantiate table environment
TableEnvironment tEnv = ...

tEnv.getConfig()        // access high-level configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.resource.default-parallelism", ***parallelism***)

推荐阅读