首页 > 解决方案 > SlidingEventTimeWindows 不产生任何输出

问题描述

我有一个流执行配置为

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
    SystemsCpu.TOPIC,
    ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
    config)
    .setStartFromLatest());

DataStream<Anomaly> anomalies = stream
    .keyBy(x -> x.get("host").toString())
    .window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
    .process(new AnomalyDetector())
    .name("anomaly-detector");

public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
    var anomaly = new Anomaly();
    anomaly.setValue(1.0);
    out.collect(anomaly);
  }
}

但是由于某种原因SlidingEventTimeWindows,它不会产生任何要处理的输出AnomalyDetector(即根本不会触发进程)。例如,如果我使用它,TumblingEventTimeWindows它会按预期工作。

有什么想法可能导致这种情况吗?我使用SlidingEventTimeWindows不正确吗?

标签: javaapache-flinkflink-streaming

解决方案


When doing any sort of event time windowing it is necessary to provide a WatermarkStrategy. Watermarks mark a spot in the stream, and signal that the stream is complete up through some specific point in time. Event time windows can only be triggered by the arrival of a sufficiently large watermark.

See the docs for details, but this could be something like this:

DataStream<MyType> timestampedEvents = events
  .assignTimestampsAndWatermarks(
      WatermarkStrategy
        .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((event, timestamp) -> event.timestamp));

However, since you are using Kafka, it's usually better to have the Flink Kafka consumer do the watermarking:

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);

kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);

DataStream<MyType> stream = env.addSource(kafkaSource);

Note that if you use this later approach, and if your events are in temporal order within each Kafka partition, you can take advantage of the per-parition watermarking that the Flink Kafka source provides, and use WatermarkStrategy.forMonotonousTimestamps() rather than the bounded-of-orderness strategy. This has a number of advantages.

By the way, and this is unrelated to your question, but you should be aware that by specifying SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20)), every event will be copied into each of 60 overlapping windows.


推荐阅读