首页 > 解决方案 > Flink 去重和 processWindowFunction

问题描述

我正在创建一个管道,其中输入是包含时间戳字段的 json 消息,用于设置 eventTime。问题在于某些记录可能会延迟或重复到达系统,这种情况需要进行管理;为避免重复,我尝试了以下解决方案:

                .assignTimestampsAndWatermarks(new RecordWatermark()
                        .withTimestampAssigner(new ExtractRecordTimestamp()))
                .keyBy(new MetricGrouper())
                .window(TumblingEventTimeWindows.of(Time.seconds(60)))
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(3)))
                .process(new WindowedFilter())
                .keyBy(new MetricGrouper())
                .window(TumblingEventTimeWindows.of(Time.seconds(180)))
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
                .process(new WindowedCountDistinct())
                .map((value) -> value.toString());

其中第一个窗口操作用于根据保存在集合中的时间戳过滤记录,如下所示:

public class WindowedFilter extends ProcessWindowFunction<MetricObject, MetricObject, String, TimeWindow> {
    HashSet<Long> previousRecordTimestamps = new HashSet<>();

    @Override
    public void process(String s, Context context, Iterable<MetricObject> inputs, Collector<MetricObject> out) throws Exception {
        String windowStart = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getStart()));
        String windowEnd = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getEnd()));
        log.info("window start: '{}', window end: '{}'", windowStart, windowEnd);

        Long watermark = context.currentWatermark();
        log.info(inputs.toString());
        for (MetricObject in : inputs) {
            Long recordTimestamp = in.getTimestamp().toEpochMilli();
            if (!previousRecordTimestamps.contains(recordTimestamp)) {
                log.info("timestamp not contained");
                previousRecordTimestamps.add(recordTimestamp);
                out.collect(in);
            }
        }
    }

这个解决方案有效,但我觉得我没有考虑重要的事情,或者可以以更好的方式完成。

标签: javaapache-flink

解决方案


使用 windows 进行重复数据删除的一个潜在问题是,在 Flink 的 DataStream API 中实现的 windows 总是与 epoch 对齐。这意味着,例如,发生在 11:59:59 的事件和发生在 12:00:01 的重复事件将被放置到不同的一分钟窗口中。

但是,在您的情况下,您关注的重复项似乎也带有相同的时间戳。在这种情况下,只要您不担心水印会产生延迟事件,您所做的就会产生正确的结果。

使用 windows 进行重复数据删除的另一个问题是它们对管道施加的延迟,以及用于最小化延迟的解决方法。

RichFlatMapFunction这就是为什么我更喜欢使用 a或 a来实现重复数据删除KeyedProcessFunction。像这样的东西会比窗口表现得更好:

private static class Event {
  public final String key;
}

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();
  
  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    StateTtlConfig ttlConfig = StateTtlConfig
      .newBuilder(Time.minutes(1))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .cleanupFullSnapshot()
      .build();
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    desc.enableTimeToLive(ttlConfig);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

这里流被 删除重复key,并且所涉及的状态在一分钟后被自动清除。


推荐阅读