首页 > 解决方案 > Can Flink produce hourly snapshots of aggregated/rolling/accumulated data?

问题描述

The text book example of stream processing is a timestamped word count program. With the following data sample

mario 10:00
luigi 10:01
mario 11:00
mario 12:00

I have seen word count programs produced over:

A total data set

mario 3
luigi 1

A set of time window partitions

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1

I have not found however, an example of a word count program over a rolling time window, i.e. I'd like a word count to be produced hourly for every word from the beginning of time:

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1

Is this possible with Apache Flink or any other stream processing library? Thanks!

edit:

I have so far attempted a variant of David Anderson's approach, only changing processing time for event time as data is timestsamped. It's not working as I was expecting though. Here's the code, the sample data, the results it provides and my follow-up questions:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp - 1);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                    }

                    long l = ((value.getTimestamp() / 10) + 1) * 10;
                    ctx.timerService().registerEventTimeTimer(l);

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static long fileCounter = 0;

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            out.collect(new TimestampedWord(value, fileCounter++));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
    }
}

With a file with the following words, each in a new line:

mario,luigi,mario,mario,vilma,fred,bob,bob,mario,dan,dylan,dylan,fred,mario,mario,carl,bambam,summer,anna,anna,edu,anna,anna,anna,anna,anna

Produces the following output:

mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807

Something is clearly wrong. I'm getting a count of 3 for anna at 20 even though the third instance of the word anna doesn't appear until position 22. Strangely enough edu does appear only in the last snapshot even though it appeared before annas third instance. How could I trigger a snapshot to be produced every 10 "units of time" even if no messages arrive (i.e. same data should be produced)?

If anyone could point me in the right direction I'd be very thankful!

标签: apache-flinkstream-processing

解决方案


Yes, this is not only possible to do with Flink, but it's easy. You can do this with a KeyedProcessFunction that maintains a counter in keyed state for the number of times each word/key has appeared so far in the input stream. Then use a timer to trigger the reporting.

Here's an example that uses processing time timers. It prints out a report every 10 seconds.

public class DSExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
            .keyBy(x -> x)
            .process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
                private transient ValueState<Integer> counter;

                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
                }

                @Override
                public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
                    if (counter.value() == null) {
                        counter.update(0);
                        long now = context.timerService().currentProcessingTime();
                        context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    }
                    counter.update(counter.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
                    long now = context.timerService().currentProcessingTime();
                    context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
                }
            })
            .print();

        env.execute();
    }
}

Updated:

It's always better to use event-time, but this does add complexity. Most of the added complexity arises from the fact that in real applications you are very likely to have to deal with out-of-order events -- which you have avoided in your example, so in this case we can get away with a fairly simple implementation.

If you change two things, you'll get the results you expect. First, setting the Watermarks to be extractedTimestamp - 1 is the reason why the results are wrong (e.g., this is why anna=3 at 20). If you set the Watermark to extractedTimestamp instead, this problem will go away.

Explanation: It is the arrival of the third anna that creates the Watermark that closes the window at time 20. The third anna has a timestamp of 21, and so it is followed in the stream by a Watermark at 20, which closes the second window and produces the report saying anna=3. Yes, the first edu arrived earlier, but it was the first edu, with a timestamp of 20. At the time edu arrives there is no timer set for edu, and the timer that gets created is correctly set to fire at 30, so we don't hear about edu until a Watermark of at least 30 arrives.

The other problem is the timer logic. Flink creates a separate timer for every key, and you need to create a new timer every time a timer fires. Otherwise you will only get reports about words that arrived during the window. You should modify the code to be more like this:

@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    if (count.value() == null) {
        count.update(0L);
        setTimer(ctx.timerService(), value.getTimestamp());
    }

    count.update(count.value() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    long currentWatermark = ctx.timerService().currentWatermark();
    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
    if (currentWatermark < Long.MAX_VALUE) {
        setTimer(ctx.timerService(), currentWatermark);
    }
}

private void setTimer(TimerService service, long t) {
    service.registerEventTimeTimer(((t / 10) + 1) * 10);
}

With these changes, I get these results:

mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807

Now, if you needed to actually handle out-of-order events, this would get quite a bit more complicated. It would be necessary to have the watermarks lag behind the timestamps by some realistic amount reflecting the actual amount of out-of-orderness present in the stream, which would then necessitate being able to handle having more than one window open at a time. Any given event/word might not belong to the window that will close next, and so shouldn't increment its counter. You might, for example, buffer these "early" events in another piece of state (e.g., ListState), or somehow maintain multiple counters (perhaps in MapState). Furthermore, some events might be late, thus invalidating earlier reports, and you'd want to define some policy for handling that.


推荐阅读