首页 > 解决方案 > Flink 任务管理器挂起

问题描述

这是程序

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

ParameterTool parameters = ParameterTool.fromArgs(args);
String ftpUri ;

env.readTextFile(ftpUri,"UTF-8")
    .map(mapFunction)
    .keyBy(tuple2 -> tuple2.f0)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
    .reduce((tuple2, t1) -> {
                Collection newCol = new ArrayList<OpisRecord>();
                Collections.addAll(newCol,tuple2.f1.toArray());
                Collections.addAll(newCol,t1.f1.toArray());         
                return  new Tuple2(tuple2.f0,newCol);
            })
     .addSink(new SinktoDistributedCache());

env.execute();

适用于记录大小:10k 到 40k。但是挂断任何超过40k的东西。我尝试增加任务管理器的数量和并行性,但没有任何收获。

有什么线索吗?

标签: flink-streaming

解决方案


推荐阅读