apache-flink - 当我使用 Flink SlidingEventTimeWindows 时“缓冲池被破坏”
问题描述
当我使用“SlidingEventTimeWindows”时,Flink 抛出“java.lang.IllegalStateException:缓冲池被破坏”,但是当我更改为“SlidingProcessingTimeWindows”时,一切正常。
堆栈跟踪如下:
18:37:53,728 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker.
java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
... 10 more
我终于解决了以下步骤。
首先,将 My DataMockSource 中的“collect”替换为“collectWithTimestamp”,用于生成流数据。这样做后,“发出延迟标记时出错”将在控制台中消失。
其次,将 BoundedOutOfOrdernessTimestampExtractor 替换为 AscendingTimestampExtractor,用于 EventTime 处理。在我的 DataMockSource 中,我生成数据并同时发出,因此 AscendingTimestampExtractor 是生成水印的正确方法。
我在这里发布主要代码,并在github上发布完整项目。希望它是有帮助的。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //
DataStreamSource<MockData> mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<MockData>() {
@Override
public long extractAscendingTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = mockDataDataStreamSource
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
// .allowedLateness(Time.seconds(5))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
countStream.addSink(new SinkFunction<Tuple2<String, Long>>() {
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
System.out.println(value);
}
});
env.execute("count test ");
我的 DataMockSource 在这里:
private volatile boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running){
MockData mockData = new MockData();
mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
mockData.setCountry("country "+ThreadLocalRandom.current().nextInt(2,5));
mockData.setId(ThreadLocalRandom.current().nextLong());
mockData.setTimestamp(Instant.now().toEpochMilli());
// emit record with timestamp
sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());
// sourceContext.collect(mockData);
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
running = false;
}
解决方案
在事件时间工作时,您需要安排在源中或使用assignTimestampsAndWatermarks 进行时间戳提取和水印。看起来你没有这样做,这可以解释为什么你不会得到任何输出(永远不会触发事件时间窗口)。
此外,您的来源应该有一个取消方法。像这样的东西:
private volatile boolean running = true;
@Override
public void run(SourceContext ctx) throws Exception {
while (running) {
...
}
}
@Override
public void cancel() {
running = false;
}
我认为这可以解释您所看到的异常。在作业开始自行关闭后,源可能会继续运行并发送延迟标记。
推荐阅读
- dataframe - 将“列”添加到 tf.data.dataset 的最佳方法
- python - rabbitmq集群管理员是谁?如何从集群中获取数据?
- python - 根据字典更新数据框的一行
- excel - 如何更改一列未格式化的数字,以便 Excel 可以使用 VBA 将它们实际读取为数字
- tsql - concat 使用空格作为分隔符标记问题而不是空格
- python - Firebird 检查列中的类型
- c - 字符串的大小
- ios - Swift Decode JSON - 无法解码
- javascript - NestJS 无法解决 JWT_MODULE_OPTIONS 的依赖关系(同样的问题,不同的解决方案)
- browser - IP 地址可以是有效的域名吗?