apache-flink - 如何使用 flink cep 库检测 babbling 模式
问题描述
如何使用 flink cep 库检测 babbling 模式?
示例:假设设备有问题,因此它会不断发布诸如开、关之类的值。如何使用 CEP 检测模式,如果问题存在 30 分钟。我在下面提到的一些示例数据。
OFF 16/08/18 11:38
ON 16/08/18 11:38
OFF 16/08/18 11:38
ON 16/08/18 11:37
OFF 16/08/18 11:37
ON 16/08/18 11:36
OFF 16/08/18 11:36
OFF 16/08/18 11:36
ON 16/08/18 11:36
OFF 16/08/18 11:35
ON 16/08/18 11:35
ON 16/08/18 11:34
OFF 16/08/18 11:34
解决方案
如果您的流是按时间排序的(只对流为每个单独的设备进行排序很重要),那么您可以轻松地转换流以使分析更容易。像RichFlatMapFunction
这样会将 ON OFF 事件序列转换为状态 CHANGE 事件序列:
static class DetectChanges extends RichFlatMapFunction<String, String> {
private transient ValueState<String> previousState;
@Override
public void open(Configuration parameters) throws Exception {
previousState = getRuntimeContext().getState(new ValueStateDescriptor<>("previousState", String.class));
}
@Override
public void flatMap(String onOrOff, Collector<String> out) throws Exception {
if (previousState.value() != onOrOff) {
out.collect("CHANGE");
previousState.update(onOrOff);
}
}
}
现在问题已简化为确定流在一段时间内是否有一定数量的 CHANGE 事件。这可以通过滑动窗口轻松完成,或者如果您愿意,也可以使用 CEP。
您也可以完全使用 CEP 来做到这一点。从概念上讲,您可以按如下方式处理:
- 定义匹配 ON+ OFF+ 的单个模式
- 然后定义一个与该 ON/OFF 模式匹配的模式组,只要它在某个时间间隔内出现n次
推荐阅读
- arrays - How to pass data into swiftui view and access it
- couchbase - 如何加入一个数组中的元素,该数组也是另一个数组的一部分,以在沙发库中获得最佳性能?
- python - 动态变化
- linux - 从 crontab 重启 ea-tomcat85
- python - 如何按字符串中最后找到的数字拆分字符串?
- java - 如何从我的应用程序中打开 Google Play 商店应用程序页面?
- python - auto_increment 主键和链接表(Mysql)
- r - 分离单元格内的数据并复制行数据
- google-sheets - 将 Google 协作平台数据提取到 Google 表格中
- amazon-web-services - aws signature get api 中出现缺少参数和 400 错误