java - SlidingEventTimeWindows 不产生任何输出
问题描述
我有一个流执行配置为
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
SystemsCpu.TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
config)
.setStartFromLatest());
DataStream<Anomaly> anomalies = stream
.keyBy(x -> x.get("host").toString())
.window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
.process(new AnomalyDetector())
.name("anomaly-detector");
public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
var anomaly = new Anomaly();
anomaly.setValue(1.0);
out.collect(anomaly);
}
}
但是由于某种原因SlidingEventTimeWindows
,它不会产生任何要处理的输出AnomalyDetector
(即根本不会触发进程)。例如,如果我使用它,TumblingEventTimeWindows
它会按预期工作。
有什么想法可能导致这种情况吗?我使用SlidingEventTimeWindows
不正确吗?
解决方案
When doing any sort of event time windowing it is necessary to provide a WatermarkStrategy
. Watermarks mark a spot in the stream, and signal that the stream is complete up through some specific point in time. Event time windows can only be triggered by the arrival of a sufficiently large watermark.
See the docs for details, but this could be something like this:
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
However, since you are using Kafka, it's usually better to have the Flink Kafka consumer do the watermarking:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);
DataStream<MyType> stream = env.addSource(kafkaSource);
Note that if you use this later approach, and if your events are in temporal order within each Kafka partition, you can take advantage of the per-parition watermarking that the Flink Kafka source provides, and use WatermarkStrategy.forMonotonousTimestamps()
rather than the bounded-of-orderness strategy. This has a number of advantages.
By the way, and this is unrelated to your question, but you should be aware that by specifying SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))
, every event will be copied into each of 60 overlapping windows.
推荐阅读
- reactjs - 设置“选择文件”按钮的样式
- angular - 自动调整 mat-header-cell 中的输入宽度
- python - python中R的“制表”和“C”的等价物
- php - 为什么刀片指令仍然可见?
- vue.js - vue-chartjs 圆环图百分比显示
- docker - 当 Docker 共享文件夹抛出实例时它是如何工作的。我们是否应该考虑对一个文件抛出容器进行并发修改?
- ansible-galaxy - 使用 enix.mongodb Ansible Galaxy 角色安装特定的 MongoDB 版本
- python - 在 pytube 模块问题中解压的值太多
- analytics - 无法在自定义代码中的 Adobe Launch 中设置 list2
- python - 名称列表非定义