java - Apache Flink CEP如何根据事件值传递时间窗口?
问题描述
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
有没有办法可以Time.seconds(10)
用value.getSomeTimeField()
我通过的方式代替Event
?
解决方案
我猜你想以事件时间的方式工作。有关它的更多信息,您可以查看此文档和有关如何从元素中提取时间戳的本节。
在您的示例中,您可以执行以下操作:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return value.getSomeTimeField();
}
})
CEP.pattern(input, pattern).select(...)
这样,事件将在流中自动排序,并且超时将在两种情况下应用于时间字段。
推荐阅读
- docker - 仅当以 root 身份启动 Gradle Docker 容器时,Gradle 项目的 Jenkins 构建才会成功
- alloy - 定义限制飞机容量的合金事实
- php - 需要帮助在 php 中为 AWS MediaConvert 创建作业设置
- java - 检查是否在控制台中按下了某个键
- javascript - 作为在 Laravel 中翻译数据库记录的最佳解决方案,您有什么建议?
- javascript - 使用对象更改文本中的单词 - javascript
- algorithm - 将划分的多边形分组为 N 个连续的形状
- ssl - TLS 扩展 pre_sharek_key
- macos-catalina - 有没有办法按需重启 quicklookd?
- python - 子图 python Matplotlib 不使用 groupby().sum()