flink-streaming - Flink 任务管理器挂起
问题描述
这是程序
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
ParameterTool parameters = ParameterTool.fromArgs(args);
String ftpUri ;
env.readTextFile(ftpUri,"UTF-8")
.map(mapFunction)
.keyBy(tuple2 -> tuple2.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.reduce((tuple2, t1) -> {
Collection newCol = new ArrayList<OpisRecord>();
Collections.addAll(newCol,tuple2.f1.toArray());
Collections.addAll(newCol,t1.f1.toArray());
return new Tuple2(tuple2.f0,newCol);
})
.addSink(new SinktoDistributedCache());
env.execute();
适用于记录大小:10k 到 40k。但是挂断任何超过40k的东西。我尝试增加任务管理器的数量和并行性,但没有任何收获。
有什么线索吗?
解决方案
推荐阅读
- android - 是否可以使用 Flutter 阅读谷歌地图导航通知?
- javascript - Javascript 对象问题无法发送特定密钥
- html - 如何同时冻结 HTML 表格的顶部和左侧
- c# - c# 从另一个类中设置类属性
- c# - Xamarin Forms 项目模板有问题吗?
- java - 如何使用同一 jdbc 驱动程序的不同版本设置多个数据源?
- php - PHP preg_replace 填充一个数字
- excel - 如何将包含值和频率的表格转换为 Excel 2016 中的单个列表?
- c++ - 是否有一种优雅而快速的方法来测试整数中的 1 位是否位于连续区域中?
- python-3.x - 使用 pycharm 在 Django 中创建项目