首页 > 解决方案 > Apache flink - 作业简单窗口问题 - java.lang.RuntimeException:段已被释放 - 迷你集群问题

问题描述

Apache flink - 作业简单窗口问题 - java.lang.RuntimeException:段已被释放

你好,

我是一个 flink newbee,在我的工作中,我正在尝试使用窗口来简单地聚合元素以启用延迟处理:

src = src.timeWindowAll(Time.milliseconds(1000)).process(new BaseDelayingProcessAllWindowFunctionImpl());

processwindow 函数只是收集输入元素:

  public class BaseDelayingProcessAllWindowFunction<IN> extends ProcessAllWindowFunction<IN, IN, TimeWindow> {

    private static final long serialVersionUID = 1L;

    protected Logger          logger;

    public BaseDelayingProcessAllWindowFunction() {
        logger = LoggerFactory.getLogger(getClass());
    }

    @Override
    public void process(ProcessAllWindowFunction<IN, IN, TimeWindow>.Context context, Iterable<IN> elements, Collector<IN> out) throws Exception {
        for (IN in : elements) {
            out.collect(in);
        }

    }


  }

问题是,我在本地调试过程中遇到以下错误(从 eclipse 开始工作):

 [2019-01-18 14:38:18,753] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)


 [2019-01-18 14:38:30,825] INFO Source: dataSource -> Flat Map (1/1) (3677b50300c3c432e862af413796ee5d) switched from RUNNING to FAILED. (org.apache.flink.runtime.taskmanager.Task:940)
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
    at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:691)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext$WatermarkEmittingTask.onProcessingTime(StreamSourceContexts.java:264)
    at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
    ... 7 more
Caused by: java.lang.RuntimeException: segment has been freed
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:691)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:759)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
    ... 10 more
Caused by: java.lang.IllegalStateException: segment has been freed
    at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:228)
    at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:381)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:85)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:97)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:117)
    at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:87)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 13 more

谷歌搜索让我认为这个错误与 OOM 错误有关,所以我尝试了以下(全部失败):

我尝试通过 hack 将 defaultLocalParallelism 从 8 更改为 1:

private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();

public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
}

还尝试增加内存(-Xms4096m -Xmx4096m -Xmn512m)并尝试将窗口大小减小到 10 毫秒,但以上都没有帮助..

请指教

更新

评论后,为了缩小问题范围,我将复杂的作业简化为单个打印语句,如下所示,但仍然出现相同的错误:

       DataStream<String> dataStream = getSource(KAFKA_DATA_SOURCE_NAME).getDataStream();

        SingleOutputStreamOperator<String> out2 = dataStream.timeWindowAll(Time.milliseconds(10)).process(new StringDelayingProcessAllWindowFunction());

        out2.print(); 

声明如下,但仍然有同样的错误。

所有窗口功能子类都没有实现。

public class StringDelayingProcessAllWindowFunction extends BaseDelayingProcessAllWindowFunction<String> {

    private static final long serialVersionUID = 1L;

}

迷你集群是否有任何特殊设置或任何其他窗口设置?

更新 2

我确认,这个丑陋的问题只发生在迷你集群环境中:

Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

当我在测试集群上提交相同的作业时,这个简单的作业没有收到错误。所以问题是我如何在迷你集群中运行窗口。尝试使用 32 位 jdk 也无济于事..

标签: javaapache-flinkflink-streaming

解决方案


推荐阅读