apache-flink - 如果在 Apache flink 中 5 秒内发现 5 次相同的数据,则生成 CEP 事件
问题描述
如果我们发现字符串消息在5秒内连续5次以字符“ a ”开头,我需要生成 CEP 事件。
为此,我编写了一个类CEPCharEventPublisher.java,它将字符串消息(如下Published message)发布到 kafka 主题“ charEvent ”
发布消息:
b; date- 2019-06-27 09:05:09.605
a; date- 2019-06-27 09:05:10.160
c; date- 2019-06-27 09:05:10.661
b; date- 2019-06-27 09:05:11.162
c; date- 2019-06-27 09:05:11.669
b; date- 2019-06-27 09:05:12.175
b; date- 2019-06-27 09:05:12.675
b; date- 2019-06-27 09:05:13.176
a; date- 2019-06-27 09:05:13.676
c; date- 2019-06-27 09:05:14.176
b; date- 2019-06-27 09:05:14.677
b; date- 2019-06-27 09:05:15.177
b; date- 2019-06-27 09:05:15.678
c; date- 2019-06-27 09:05:16.178
a; date- 2019-06-27 09:05:16.679
c; date- 2019-06-27 09:05:17.179
c; date- 2019-06-27 09:05:17.680
c; date- 2019-06-27 09:05:18.180
c; date- 2019-06-27 09:05:18.681
c; date- 2019-06-27 09:05:19.181
c; date- 2019-06-27 09:05:19.681
a; date- 2019-06-27 09:05:20.182
c; date- 2019-06-27 09:05:20.682
b; date- 2019-06-27 09:05:21.182
c; date- 2019-06-27 09:05:21.682
b; date- 2019-06-27 09:05:22.183
a; date- 2019-06-27 09:05:22.683
b; date- 2019-06-27 09:05:23.184
a; date- 2019-06-27 09:05:23.684
c; date- 2019-06-27 09:05:24.184
b; date- 2019-06-27 09:05:24.685
b; date- 2019-06-27 09:05:25.186
c; date- 2019-06-27 09:05:25.687
b; date- 2019-06-27 09:05:26.187
a; date- 2019-06-27 09:05:26.687
a; date- 2019-06-27 09:05:27.188
a; date- 2019-06-27 09:05:27.688
b; date- 2019-06-27 09:05:28.188
b; date- 2019-06-27 09:05:28.688
现在我有一个消费者CEPCharEventConsumer.java,它将从 Kafka 主题charEvent读取消息并过滤以字符“ a ”开头的消息。
然后我编写了以下模式来生成 CEP 事件/警报,同时我们发现连续5 条消息,该消息在5秒内以字符“ a ”开头。
Pattern<String, String> pattern = Pattern.<String> begin("start")
.times(5).greedy().where(new SimpleCondition<String>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(String value) throws Exception {
return value.split(";")[0].equals("a");
}
}).within(Time.seconds(5));
打印下面由CEPCharEventConsumer.java接收到的以字符“a”开头的消息。
2> a; date- 2019-06-27 09:05:10.160
1> a; date- 2019-06-27 09:05:13.676
3> a; date- 2019-06-27 09:05:16.679
2> a; date- 2019-06-27 09:05:20.182
3> a; date- 2019-06-27 09:05:22.683
1> a; date- 2019-06-27 09:05:23.684
3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199
1> a; date- 2019-06-27 09:05:33.703
1> a; date- 2019-06-27 09:05:35.203
3> a; date- 2019-06-27 09:05:36.705
2> a; date- 2019-06-27 09:05:38.207
1> a; date- 2019-06-27 09:05:39.709
2> a; date- 2019-06-27 09:05:40.209
3> a; date- 2019-06-27 09:05:40.728
打印的警报消息:
4> Found: a; date- 2019-06-27 09:05:26.687
在上述消息中“ Found: a; date-2019-06-27 09:05:26.687 ”是警报消息。
我无法理解 flink 如何在 5 秒内计算出连续的 5 条消息。我认为那里有问题。
我附上了源代码的GIT URL ( flink-cep-char-event )。谁能按照我的要求把它改正。
解决方案
您的基于 CEP 的应用程序似乎正确报告了这 5 条消息
3> a; date- 2019-06-27 09:05:26.687
1> a; date- 2019-06-27 09:05:27.188
1> a; date- 2019-06-27 09:05:27.688
1> a; date- 2019-06-27 09:05:29.198
2> a; date- 2019-06-27 09:05:30.199
发生在 5 秒的间隔内。
你的processMatch
方法PatternProcessFunction
是通过一个 Map<String, List<String>> match
. 在您的情况下match.get("start")
,返回模式的“开始”子句中的 5 个匹配事件的列表(这是整个模式)。
因此,要生成一份报告,给出最后一个匹配事件的时间,而不是第一个匹配事件的时间,请更改
String start = match.get("start").get(0);
out.collect("Found: " + start);
至
String last = match.get("start").get(4);
out.collect("Found: " + last);
推荐阅读
- node.js - Microsoft Bot Framework 多用户
- python - How to print in one line the outputs of two lists?
- php - how to find operator in sting and calculate as per my formula?
- mysql - Trigger to Insert N number of default values into row
- javascript - 我想给我的每个数组元素一个数字
- excel - Automation of Excel file merging
- java - Java lambda - 具有两个单独条件的过滤器列表
- r - R Studio 中的时间戳
- flutter - 如何在 Flutter 中为自定义画家的颜色设置动画?
- javascript - Loop over Object and return the length of each key cumulatively in an array