java - Apache Flink CEP如何检测事件是否在x秒内没有发生?
问题描述
例如,A 应该在 10 秒内跟随 B。我知道如何跟踪此 DID 是否发生(.next,.within),但如果 B 从未在窗口内发生,我想发送警报。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
// env.enableCheckpointing(1000);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
final DataStream<String> inputStream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"cep", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
inputStream.print();
Pattern<String, ?> simplePattern =
Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("A");
}
})
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("B");
}
});
PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
OutputTag<String> timedout = new OutputTag<String>("timedout"){};
SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
timedout,
new TimedOut<String>(),
new FlatSelectNothing<String>()
);
timedOutNotificationsStream.getSideOutput(timedout).print();
env.execute("mynotification");
}
public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
@Override
public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
out.collect((String) "LATE!");
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}
实际行为:
publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)
publish "A"
(wait 10 seconds)
=> (no alert, but should be)
publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"
预期行为:
publish "A"
(wait 10 seconds)
=> "LATE!"
解决方案
推荐阅读
- amazon-web-services - 无法使 AWS ECS 服务通过服务发现进行通信
- nginx - 在 NGINX 中将 X-Frame_options 从 Deny always 更改为 Sameorigin
- ruby-on-rails - 在 Rails 中的哪里存储 AWS 密钥?
- idl - 如何升级 REDHAWK SDR 框架以支持 SCA 4.1 规范(软件通信架构)
- bixby - 如何在 Bixby 中重置配置文件存储的数据
- excel - Datagrid 在 WPF 中使用 for 循环给出空值
- windows - 如何从 Windows 服务应用程序运行包含 OpenSSL 加密文件命令的 Windows 批处理文件
- java - 在 ios 真实设备中启动应用程序
- angular - Angular Router:将模块加载为延迟加载模块的子模块
- javascript - “客户端需要将 JSON 转换为 HTML”含义