首页 > 解决方案 > TimeWindowAll 函数未在 apache flink 中调用

问题描述

我在 apache flink 中有一个非常简单的流式管道设置,管道工作,我能够将 processFunction 应用于输入数据流,如下所示:

    DataStream<MeasurementData> data = env.addSource(consumer);
    DataStream<MeasurementData> dataProcessed =data.process(new FFT());
    dataProcessed.print();
    dataProcessed.addSink(new FlinkKafkaProducer011<>(
            "localhost:9092",      // Kafka     broker host:port
            OUTPUT_TOPIC,       // Topic to write to
            new MeasurementDataSchema())  // Serializer
    );  

现在我想在某个时间的窗口上应用一个 ProcessWindowFunction 操作,而不是为每个传入的数据点应用该函数。我试过这样:

        DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
       .process(new MyProcessWindowFunction());

以及 MyProcessWindowFunction() 的定义:

public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {

    public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
        long count = 0;
        for (MeasurementData data : input) {
            for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
                matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
            }
            count++;
            out.collect(data);
        }

    }
}

但是这个函数似乎永远不会被调用。我尝试在其中放置打印语句,并使用调试器逐步完成整个程序。有什么我想念的吗?任何提示表示赞赏。

标签: javastreamapache-flink

解决方案


发现问题:环境设置为使用 EventTime 而不是 processingTime,而我的数据不包含任何事件时间戳。


推荐阅读