首页 > 解决方案 > Flink 事件会话窗口不发出记录

问题描述

我正在使用 eventSessionWindow 为由 id 和窗口键入的用户编写一个管道来分组会话。我正在使用 Periodic WM 和一个自定义会话累加器,它将计算事件是一个给定的会话。

正在发生的是我的窗口操作员正在消费记录但没有发出。我不确定这里缺少什么。


FlinkKafkaConsumer010<String> eventSource =
                new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
        eventSource.setStartFromLatest();

DataStream<Event> eventStream = env.addSource(eventSource
        ).flatMap(
                new FlatMapFunction<String, Event>() {

                    @Override
                    public void flatMap(String value, Collector<Event> out) throws Exception {
                        out.collect(Event.toEvent(value));
                    }
                }
        ).assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<Event>() {
                    long maxTime;

                    @Override
                    public long extractTimestamp(Event element, long previousElementTimestamp) {
                        maxTime = Math.max(previousElementTimestamp, maxTime);
                        return previousElementTimestamp;
                    }

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(maxTime);
                    }
                }
        );

       DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))

                .aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
                    @Override
                    public pipe.SessionAccumulator createAccumulator() {
                        return new pipe.SessionAccumulator();
                    }

                    @Override
                    public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
                        sessionAccumulator.add(e);
                        return sessionAccumulator;
                    }

                    @Override
                    public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator.getLocalValue();
                    }

                    @Override
                    public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
                        prev.merge(next);
                        return prev;
                    }

                }, new WindowFunction<Session, Session, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
                        collector.collect(iterable.iterator().next());
                    }
                });


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
            );
            return sessionAccumulator;
        }
    }


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
                    session.lastEventTime,
                    session.earliestEventTime,
                    session.count;

            );
            return sessionAccumulator;
        }
    }

标签: flink-streaming

解决方案


如果您的水印没有前进,这可以解释为什么窗口没有发出任何结果。可能的原因包括:

  • Kafka 没有为您的事件添加时间戳,因此没有设置 previousElementTimestamp。
  • 您有一个空闲的 Kafka 分区阻止水印。(这是一个有点复杂的话题。如果这证明是您的问题的原因,并且您被卡住了,请回来提出一个新问题。)

另一种可能性是事件中永远不会有 5 分钟的间隔,在这种情况下,事件将累积在一个永无止境的会话中。

此外,您似乎没有包括水槽。如果您不打印或以其他方式将结果发送到接收器,Flink 将不会做任何事情。

并且不要忘记您必须打电话env.execute()才能让任何事情发生。

其他几件事:

您的水印生成器不允许任何乱序,因此窗口将忽略所有乱序事件(因为它们会迟到)。如果您的事件具有严格递增的时间戳,您应该继续使用AscendingTimestampExtractor;如果它们可能是无序的,那么BoundedOutOfOrdernessTimestampExtractor是合适的。

你的 WindowFunction 是多余的。它只是将聚合器的结果向下游转发,因此您可以将其删除。

您已经发布了 SessionAccumulator 的两种不同实现。


推荐阅读