java - Flink 去重和 processWindowFunction
问题描述
我正在创建一个管道,其中输入是包含时间戳字段的 json 消息,用于设置 eventTime。问题在于某些记录可能会延迟或重复到达系统,这种情况需要进行管理;为避免重复,我尝试了以下解决方案:
.assignTimestampsAndWatermarks(new RecordWatermark()
.withTimestampAssigner(new ExtractRecordTimestamp()))
.keyBy(new MetricGrouper())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(3)))
.process(new WindowedFilter())
.keyBy(new MetricGrouper())
.window(TumblingEventTimeWindows.of(Time.seconds(180)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
.process(new WindowedCountDistinct())
.map((value) -> value.toString());
其中第一个窗口操作用于根据保存在集合中的时间戳过滤记录,如下所示:
public class WindowedFilter extends ProcessWindowFunction<MetricObject, MetricObject, String, TimeWindow> {
HashSet<Long> previousRecordTimestamps = new HashSet<>();
@Override
public void process(String s, Context context, Iterable<MetricObject> inputs, Collector<MetricObject> out) throws Exception {
String windowStart = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getStart()));
String windowEnd = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getEnd()));
log.info("window start: '{}', window end: '{}'", windowStart, windowEnd);
Long watermark = context.currentWatermark();
log.info(inputs.toString());
for (MetricObject in : inputs) {
Long recordTimestamp = in.getTimestamp().toEpochMilli();
if (!previousRecordTimestamps.contains(recordTimestamp)) {
log.info("timestamp not contained");
previousRecordTimestamps.add(recordTimestamp);
out.collect(in);
}
}
}
这个解决方案有效,但我觉得我没有考虑重要的事情,或者可以以更好的方式完成。
解决方案
使用 windows 进行重复数据删除的一个潜在问题是,在 Flink 的 DataStream API 中实现的 windows 总是与 epoch 对齐。这意味着,例如,发生在 11:59:59 的事件和发生在 12:00:01 的重复事件将被放置到不同的一分钟窗口中。
但是,在您的情况下,您关注的重复项似乎也带有相同的时间戳。在这种情况下,只要您不担心水印会产生延迟事件,您所做的就会产生正确的结果。
使用 windows 进行重复数据删除的另一个问题是它们对管道施加的延迟,以及用于最小化延迟的解决方法。
RichFlatMapFunction
这就是为什么我更喜欢使用 a或 a来实现重复数据删除KeyedProcessFunction
。像这样的东西会比窗口表现得更好:
private static class Event {
public final String key;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupFullSnapshot()
.build();
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
desc.enableTimeToLive(ttlConfig);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
这里流被 删除重复key
,并且所涉及的状态在一分钟后被自动清除。
推荐阅读
- git - 从 Pycharm 删除后如何恢复已删除的分支
- r - 从refund R包中的拟合pfr对象中提取系数函数
- angular - 类型'employee []上不存在属性'emp' - Angular 6 ng serve --prod
- apache-spark - 了解 Spark 如何将输入文件转换为工作节点
- lambda - 使用 Lambda 函数来使用供应商提供的所有对象
- java - 在这里了解多态性
- webpack - 如何在 webpack 中合理使用 process.env 变量
- saml - 在 Node 中为离线应用程序实施 SAML 身份提供程序?
- python - python Popen 不捕获程序的标准输出。不确定为什么
- ruby - “gem install therubyracer”失败并显示错误消息“libv8 需要安装 python 2 才能构建”