apache-flink - 如何为流以外的flink表设置并行度
问题描述
正如标题一样,当我只在我的流应用程序中使用 DataStream API 时,我使用了很多 setParallelism。
最近我发现 table API 更适合在我的场景中使用,因为它可以统一通常必须是 2 个不同的批处理/流应用程序,仅使用不同的数据集/数据流 api 节省大量重复代码。但是当我尝试将我的流应用程序移植到表 API 时。我发现 Table 不支持 setParallelism。
我必须通过这样的自定义 API 来做到这一点。
谁能帮我:
1、表不需要设置并行度,为什么?
2,如果仍然需要为表设置并行度,如何以另一种方式比我的乏味方式更好地实现这一点?
static public DataStream<Row> getRowStreamParallelismed(DataStream<Row> input, Integer parallelism) {
return ((DataStreamSource<Row>) input)
.setParallelism(parallelism)
.map((MapFunction<Row, Row>) value -> {
return value;
})
//.startNewChain()
;
}
static public Table getTableParallelismed(Table input, Integer parallelism, StreamTableEnvironment tableEnv) {
RowTypeInfo rowTypeInfo = MyType.getRowTypeInfo(input);
DataStream<Row> rowInput = tableEnv.toAppendStream(input, rowTypeInfo);
DataStream<Row> parallelismed = MyStream.getRowStreamParallelismed(rowInput, parallelism);
Table ret = tableEnv.fromDataStream(parallelismed, String.join(",", rowTypeInfo.getFieldNames()));
return ret;
}
解决方案
您可以使用table.exec.resource.default-parallelism来设置并行度值。
如果您设置此配置参数,它将覆盖 StreamContext 并行度值。
// instantiate table environment
TableEnvironment tEnv = ...
tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.exec.resource.default-parallelism", ***parallelism***)
推荐阅读
- python - 如何将熊猫数据框中的特定行乘以条件
- f# - VSCode 中名为 test.fsx 的 F# 文件没有错误高亮显示
- ruby-on-rails - 在用户指定的时间发送邮件并在 Rails 错误中更新数据库中的状态
- sqlalchemy - 如何使用 SQLAlchemy 创建“ORDER BY Column = 'Value'”查询
- sql - 在 Mac 上下载 PostgreSQL
- api - 以编程方式转换就绪 -> 草稿 GitHub PR,反之亦然
- visual-studio - 如何创建虚拟设备?
- c# - JWT 与 C# 与通用存储库模式
- javascript - 在日历应用程序中,我们应该在客户端还是服务器端找到时区?
- gnuplot - 从wireshark绘制一个csv文件