首页 > 解决方案 > 如何使用 flink cep 库检测 babbling 模式

问题描述

如何使用 flink cep 库检测 babbling 模式?

示例:假设设备有问题,因此它会不断发布诸如开、关之类的值。如何使用 CEP 检测模式,如果问题存在 30 分钟。我在下面提到的一些示例数据。

OFF     16/08/18 11:38
ON      16/08/18 11:38
OFF     16/08/18 11:38
ON      16/08/18 11:37
OFF     16/08/18 11:37
ON      16/08/18 11:36
OFF     16/08/18 11:36
OFF     16/08/18 11:36
ON      16/08/18 11:36
OFF     16/08/18 11:35
ON      16/08/18 11:35
ON      16/08/18 11:34
OFF     16/08/18 11:34

标签: apache-flinkflink-streamingflink-cep

解决方案


如果您的流是按时间排序的(只对流为每个单独的设备进行排序很重要),那么您可以轻松地转换流以使分析更容易。像RichFlatMapFunction这样会将 ON OFF 事件序列转换为状态 CHANGE 事件序列:

static class DetectChanges extends RichFlatMapFunction<String, String> {
    private transient ValueState<String> previousState;

    @Override
    public void open(Configuration parameters) throws Exception {
        previousState = getRuntimeContext().getState(new ValueStateDescriptor<>("previousState", String.class));
    }

    @Override
    public void flatMap(String onOrOff, Collector<String> out) throws Exception {

        if (previousState.value() != onOrOff) {
            out.collect("CHANGE");
            previousState.update(onOrOff);
        }
    }
}

现在问题已简化为确定流在一段时间内是否有一定数量的 CHANGE 事件。这可以通过滑动窗口轻松完成,或者如果您愿意,也可以使用 CEP。

您也可以完全使用 CEP 来做到这一点。从概念上讲,您可以按如下方式处理:

  1. 定义匹配 ON+ OFF+ 的单个模式
  2. 然后定义一个与该 ON/OFF 模式匹配的模式组,只要它在某个时间间隔内出现n次

推荐阅读