flink-streaming - Flink 事件会话窗口不发出记录
问题描述
我正在使用 eventSessionWindow 为由 id 和窗口键入的用户编写一个管道来分组会话。我正在使用 Periodic WM 和一个自定义会话累加器,它将计算事件是一个给定的会话。
正在发生的是我的窗口操作员正在消费记录但没有发出。我不确定这里缺少什么。
FlinkKafkaConsumer010<String> eventSource =
new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
eventSource.setStartFromLatest();
DataStream<Event> eventStream = env.addSource(eventSource
).flatMap(
new FlatMapFunction<String, Event>() {
@Override
public void flatMap(String value, Collector<Event> out) throws Exception {
out.collect(Event.toEvent(value));
}
}
).assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<Event>() {
long maxTime;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
maxTime = Math.max(previousElementTimestamp, maxTime);
return previousElementTimestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTime);
}
}
);
DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
@Override
public pipe.SessionAccumulator createAccumulator() {
return new pipe.SessionAccumulator();
}
@Override
public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
sessionAccumulator.add(e);
return sessionAccumulator;
}
@Override
public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
return sessionAccumulator.getLocalValue();
}
@Override
public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
prev.merge(next);
return prev;
}
}, new WindowFunction<Session, Session, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
collector.collect(iterable.iterator().next());
}
});
public static class SessionAccumulator implements Accumulator<Event, Session>{
Session session;
public SessionAccumulator(){
session = new Session();
}
@Override
public void add(Event e) {
session.add(e);
}
@Override
public Session getLocalValue() {
return session;
}
@Override
public void resetLocal() {
session = new Session();
}
@Override
public void merge(Accumulator<Event, Session> accumulator) {
session.merge(Collections.singletonList(accumulator.getLocalValue()));
}
@Override
public Accumulator<Event, Session> clone() {
SessionAccumulator sessionAccumulator = new SessionAccumulator();
sessionAccumulator.session = new Session(
session.id,
);
return sessionAccumulator;
}
}
public static class SessionAccumulator implements Accumulator<Event, Session>{
Session session;
public SessionAccumulator(){
session = new Session();
}
@Override
public void add(Event e) {
session.add(e);
}
@Override
public Session getLocalValue() {
return session;
}
@Override
public void resetLocal() {
session = new Session();
}
@Override
public void merge(Accumulator<Event, Session> accumulator) {
session.merge(Collections.singletonList(accumulator.getLocalValue()));
}
@Override
public Accumulator<Event, Session> clone() {
SessionAccumulator sessionAccumulator = new SessionAccumulator();
sessionAccumulator.session = new Session(
session.id,
session.lastEventTime,
session.earliestEventTime,
session.count;
);
return sessionAccumulator;
}
}
解决方案
如果您的水印没有前进,这可以解释为什么窗口没有发出任何结果。可能的原因包括:
- Kafka 没有为您的事件添加时间戳,因此没有设置 previousElementTimestamp。
- 您有一个空闲的 Kafka 分区阻止水印。(这是一个有点复杂的话题。如果这证明是您的问题的原因,并且您被卡住了,请回来提出一个新问题。)
另一种可能性是事件中永远不会有 5 分钟的间隔,在这种情况下,事件将累积在一个永无止境的会话中。
此外,您似乎没有包括水槽。如果您不打印或以其他方式将结果发送到接收器,Flink 将不会做任何事情。
并且不要忘记您必须打电话env.execute()
才能让任何事情发生。
其他几件事:
您的水印生成器不允许任何乱序,因此窗口将忽略所有乱序事件(因为它们会迟到)。如果您的事件具有严格递增的时间戳,您应该继续使用AscendingTimestampExtractor;如果它们可能是无序的,那么BoundedOutOfOrdernessTimestampExtractor是合适的。
你的 WindowFunction 是多余的。它只是将聚合器的结果向下游转发,因此您可以将其删除。
您已经发布了 SessionAccumulator 的两种不同实现。
推荐阅读
- view - 如何为每个设备创建另一个独特的视图?
- javascript - 错误 403 AJAX 请求 JavaScript(instagram 直接消息)
- python - 如何修复“break”命令在 python 中使用键盘和时间输入不起作用?
- raylib - Makefile:2: *** 缺少分隔符。停止。我该如何解决这个问题我是 Raylib for C++ 的新手
- wufoo - 剥离开放价值订阅或产品价值
- javascript - JavaScript 中的简单 MSAL 登录/身份验证
- python - 我刮掉了帖子的评论,但他们没有刮掉
- python - 从打印/输出管理器和版本信息中停止 Selenium ChromeWebdriver?
- linux - 无法在 Linux 中增加打开文件限制
- android - 匕首刀柄:注释类@Singleton 和提供函数@Singleton 之间的区别