首页 > 解决方案 > FlinkCep:IterativeCondition而不是SimpleCondition不匹配模式

问题描述

尝试匹配 Pattern 序列中的某些元素时遇到问题。Flink 1.11.1 版本。这是我的情况:

final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();    
DataStream<Model> inputStream = env.fromElements(
            Model.of(1, "A", "US"),
            Model.of(2, "B", "US"),
            Model.of(3, "C", "US"),
            Model.of(4, "A", "AU"),
            Model.of(5, "B", "AU"),
            Model.of(6, "C", "AU"),
          //Model.of(7, "D"),
            Model.of(8, "D", "AU"),
            Model.of(9, "A", "GB"),
            Model.of(10, "B", "GB"),
            Model.of(13, "D", "GB"),
            Model.of(11, "C", "GB"),
            Model.of(12, "D", "GB")


    ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
            .forceNonParallel();

    Pattern<Model, Model> pattern = Pattern.<Model>begin("start", AfterMatchSkipStrategy.skipToNext())
            .where(new IterativeCondition<Model>() {
                @Override
                public boolean filter(Model value, Context<Model> ctx) throws Exception {
                    return value.getText().equalsIgnoreCase("A");
                }
            }).followedBy("second")
            .where(new IterativeCondition<Model>() {
                @Override
                public boolean filter(Model value, Context<Model> ctx) throws Exception {
                  
                    return value.getText().equalsIgnoreCase("B");
                }
            }).followedBy("third")
            .where(new IterativeCondition<Model>() {
                @Override
                public boolean filter(Model value, Context<Model> ctx) throws Exception {
                 
                    return value.getText().equalsIgnoreCase("C");
                }
            }).followedBy("fourth")
            .where(new IterativeCondition<Model>() {
                @Override
                public boolean filter(Model value, Context<Model> ctx) throws Exception {
                    var  list = StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), false).collect(Collectors.toList());
                    var val = list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
                    return value.getText().equalsIgnoreCase("D") && val;
                }
            });
            
            
            PatternStream<Model> marketOpenPatternStream = CEP.<Model>pattern(inputStream, pattern);
            
             SingleOutputStreamOperator<List<Model>> marketOpenOutput =
            marketOpenPatternStream
                    .process(new PatternProcessFunction<Model, List<Model>>() {
                        @Override
                        public void processMatch(Map<String, List<Model>> match, Context ctx, Collector<List<Model>> out) throws Exception {
                            System.out.println(match);
                            out.collect(new ArrayList(match.values()));
                        }
                    })

我想要成功的是只匹配具有相同符号的模式。如果我使用 SimpleCondition 仅检查 Model(A, B,C..) 的文本,而在最后一个模式中没有符号检查,则模式序列得到满足,我得到以下输出:

{start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', 
symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], fourth=[Model{id=8,text='D',symbol='AU'}]}
    
{start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], fourth=[Model{id=8, text='D', symbol='AU'}]}
    
{start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D', symbol='GB'}]}

但是我想避免 id= 1(A),2(B),3(C) 的元素与 id = 8(D) 的元素匹配。出于这个原因,我将符号检查与上一个模式中匹配的事件放在最后一个条件中,所以我没有得到匹配,因为它们没有相同的符号。但是在应用条件之后,现在我没有得到任何输出。没有任何元素与模式匹配。我错过了什么?有人可以帮忙吗?

标签: apache-flinkcomplex-event-processingflink-cep

解决方案


推荐阅读