apache-flink - FlinkCep:IterativeCondition而不是SimpleCondition不匹配模式
问题描述
尝试匹配 Pattern 序列中的某些元素时遇到问题。Flink 1.11.1 版本。这是我的情况:
final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
DataStream<Model> inputStream = env.fromElements(
Model.of(1, "A", "US"),
Model.of(2, "B", "US"),
Model.of(3, "C", "US"),
Model.of(4, "A", "AU"),
Model.of(5, "B", "AU"),
Model.of(6, "C", "AU"),
//Model.of(7, "D"),
Model.of(8, "D", "AU"),
Model.of(9, "A", "GB"),
Model.of(10, "B", "GB"),
Model.of(13, "D", "GB"),
Model.of(11, "C", "GB"),
Model.of(12, "D", "GB")
).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
.forceNonParallel();
Pattern<Model, Model> pattern = Pattern.<Model>begin("start", AfterMatchSkipStrategy.skipToNext())
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx) throws Exception {
return value.getText().equalsIgnoreCase("A");
}
}).followedBy("second")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx) throws Exception {
return value.getText().equalsIgnoreCase("B");
}
}).followedBy("third")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx) throws Exception {
return value.getText().equalsIgnoreCase("C");
}
}).followedBy("fourth")
.where(new IterativeCondition<Model>() {
@Override
public boolean filter(Model value, Context<Model> ctx) throws Exception {
var list = StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), false).collect(Collectors.toList());
var val = list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
return value.getText().equalsIgnoreCase("D") && val;
}
});
PatternStream<Model> marketOpenPatternStream = CEP.<Model>pattern(inputStream, pattern);
SingleOutputStreamOperator<List<Model>> marketOpenOutput =
marketOpenPatternStream
.process(new PatternProcessFunction<Model, List<Model>>() {
@Override
public void processMatch(Map<String, List<Model>> match, Context ctx, Collector<List<Model>> out) throws Exception {
System.out.println(match);
out.collect(new ArrayList(match.values()));
}
})
我想要成功的是只匹配具有相同符号的模式。如果我使用 SimpleCondition 仅检查 Model(A, B,C..) 的文本,而在最后一个模式中没有符号检查,则模式序列得到满足,我得到以下输出:
{start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], fourth=[Model{id=8,text='D',symbol='AU'}]}
{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], fourth=[Model{id=8, text='D', symbol='AU'}]}
{start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', symbol='GB'}]}
但是我想避免 id= 1(A),2(B),3(C) 的元素与 id = 8(D) 的元素匹配。出于这个原因,我将符号检查与上一个模式中匹配的事件放在最后一个条件中,所以我没有得到匹配,因为它们没有相同的符号。但是在应用条件之后,现在我没有得到任何输出。没有任何元素与模式匹配。我错过了什么?有人可以帮忙吗?
解决方案
推荐阅读
- aws-lambda - botocore.exceptions.ClientError:调用CreateBucket操作时发生错误(AccessDenied):访问被拒绝
- wix - WiX 组件 GUID 可以在不同的产品安装中复制吗
- javascript - 将 2 个应用程序合并在一起 (ASP.NET MVC - JavaScript)
- javascript - 数组在订阅范围之外变得未定义
- javascript - 是否可以使用 Google Map JavaScript API/JavaScript 在地图上放置一个大圆圈?
- python - 是否有任何不使用 Java 的 mdbtools 替代品?
- matlab - Octave:向/从文本文件写入/读取 f(x,y) 出错
- batch-file - 如何在 bat 中处理下一个命令之前等待应用程序结束?
- c# - 如何从 DecryptStringFromBytes_Aes 获得与 .NET C# 但在 Node js 中相同的结果?
- php - 是否可以在 symfony 5 中自定义违反约束错误渲染?