apache-flink - 流模拟过程中缺乏再现性
问题描述
我有一个 Flink 流程序,它对键控窗口流执行某些操作。运算符使用KeyedStateStore
返回的 by Context#globalState()
(参见Flink 文档)。
在实际的实时流中,我没有问题。但是,我有一些特殊的场景需要模拟流,为此我需要加载一定数量的数据并按时间戳排序。我必须分配水印的运算符以可以处理这两种情况的方式实现。
我的问题是,除非我以 parallelism=1 执行流模拟,否则我没有可重现的结果。我认为我的源可以以比我的窗口处理函数更快的速度发出事件,并且可能给定键的多个窗口以不一定保持时间顺序的方式排队。由于窗口进程函数会根据时间顺序修改状态,因此可能存在使状态不一致的竞争条件。
在模拟版本中,甚至不允许迟到,所以这应该不是问题。Parallelism=1 总是产生相同的结果(一些单元测试检查这一点)。
谁能确认流模拟是否是 Flink 支持的用例?如果是,如果确认上述行为是否会成为错误?
更新 1
以下是流管道的高级概述:
input = source with default parallelism (message bus could have multiple partitions)
keySelector = CustomKeySelector
timestampedStream = input
.assignTimestampsAndWatermarks(WaterMarker)
.setParallelism(1) // see Remark 1
streamFork1 = timestampedStream.flatMap(FlatMapFunction1)
streamFork2 = timestampedStream.flatMap(FlatMapFunction2)
streamFork1
.keyBy(keySelector)
.window(SlidingEventTimeWindow)
.process(ProcessWindowFunction1) // stateful (global)
.addSink(MessageBusSink)
internalStream = streamFork2
.keyBy(keySelector)
.window(SlidingEventTimeWindow)
internalStream
.process(ProcessWindowFunction2) // stateful (window)
.addSink(DatabaseSink1)
.setParallelism(1)
internalStream
.process(ProcessWindowFunction3) // stateful (global)
.addSink(DatabaseSink2)
.setParallelism(1)
备注 1:源不在我的控制之下,所以我不能在源上分配时间戳。这就是为什么我需要分配具有并行度 = 1 的水印,因为流中的某些分区实际上可能是空的(至少有一段时间)。
还有我的水印逻辑(类实现AssignerWithPeriodicWatermarks
,备注如下):
private final TemporalUnit slideTime;
private Instant maxEventTime = null;
private Instant maxEventTimeTruncated = Instant.ofEpochMilli(0L);
private Instant lastWatermarkTimeTruncated = Instant.ofEpochMilli(0L);
private long forceAdvanceMultiplier = 1L;
public TimestampExtractorAndPeriodicWatermarker(TemporalUnit slideTime) {
this.slideTime = slideTime;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
if (maxEventTime == null) {
return null;
}
Instant truncatedInstant = Instant.now().truncatedTo(slideTime);
if (Duration.between(maxEventTimeTruncated, maxEventTime).compareTo(slideTime.getDuration()) >= 0) {
// generate watermark when the newest event time is >= max (truncated) event time + slide time
lastWatermarkTimeTruncated = truncatedInstant;
maxEventTimeTruncated = maxEventTime.truncatedTo(slideTime);
return new Watermark(maxEventTime.toEpochMilli());
} else if (truncatedInstant.compareTo(lastWatermarkTimeTruncated) > 0) {
// generate watermark every "slide" time if no new events arrive
lastWatermarkTimeTruncated = truncatedInstant;
Instant timeToForceAdvanceFlinkTime = this.maxEventTime
.truncatedTo(slideTime)
.plus(slideTime.getDuration().multipliedBy(forceAdvanceMultiplier++));
return new Watermark(timeToForceAdvanceFlinkTime.toEpochMilli());
}
return null;
}
@Override
public long extractTimestamp(T t, long l) {
long elemTS = t.getTimestamp();
if (maxEventTime == null) {
maxEventTime = Instant.ofEpochMilli(elemTS);
} else {
maxEventTime = Instant.ofEpochMilli(Math.max(elemTS, maxEventTime.toEpochMilli()));
}
forceAdvanceMultiplier = 1L;
return elemTS;
}
我的系统的吞吐量可能非常低,但即使每分钟没有新数据到达,我也需要关闭窗口,这就是为什么我需要使用forceAdvanceMultiplier
. 我相信线程安全在这里不是问题,但我可能是错的。
解决方案
在大多数情况下,这应该是可行的——在处理历史数据或模拟数据时,可以获得可重复的、确定性的结果。但是,做导致不确定性的事情也相当容易。没有更多信息,只能推测原因。
Flink 中的非确定性源于以下两种情况之一:(1) 编写一个对并行管道之间的竞争敏感的作业,以及 (2) 使用处理时间而不是事件时间(您可能认为这是事件和系统时钟)。
对于任何给定的键,将只有一个窗口操作符的单线程实例,并且随着水印的上升,它将按顺序触发其事件时间窗口。如果结果在单个键中不一致,那么我怀疑(1)水印实现不正确,或者在某种程度上取决于系统时钟而不是事件时间时间戳,或者(2)单个键的事件来了来自多个源实例,并且您的窗口计算对这些独立源之间的竞争很敏感。
即使每个键中的结果是一致的,如果您随后将这些每个键的结果组合成某种全局结果,在最后的组合阶段引入一些不确定性也很容易。
更新:
我相信你的水印会产生一些不确定数量的后期事件,这会导致不可重现的结果。让我解释。
源是从并行读取的,如果我理解正确,则按时间戳排序。但是,随后将并行度减少到 1 以进行水印处理。此时,多个(可能)有序流交织在一起,导致单个无序流。然后水印使用 maxEventTime 作为当前水印,没有从中减去一些延迟来解决乱序,这可能导致乱序事件延迟。
您可以通过向窗口添加侧面输出以显示任何迟到的事件来确认此诊断。最简单的解决方法是将源的并行度设置为 1。
推荐阅读
- angular - *ngFor 不显示
- python - 在python中选择不替换
- javascript - JQuery & AJAX 获取文本而不是值
- spring-cloud - Spring Cloud Gateway - 基于Header的重定向
- angular - 如果值在模板角度 2/5 中未定义
- xslt - 如何使用 XSLT 在 pageTarget 中添加 playOrder 序列?
- javascript - 部署新更改后未显示 Firebase 功能
- powershell - 使用 Powershell 获取下一个可用的 IIS 绑定
- java - 创建并使用 loadUserByEmail 而不是 loadUserByUsername
- apache-spark - java.lang.OutOfMemoryError:PYSPARK 中超出了 GC 开销限制