首页 > 解决方案 > flink 1.8.2 中未删除 testHarness 计时器

问题描述

我正在使用 testHarness 来测试我的自定义触发器。简化的片段附在下面:

public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {

    private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);

    private final long allowedLatenessMillis;


    public CustomTrigger(long allowedLatenessMillis) {
        this.allowedLatenessMillis = allowedLatenessMillis;
    }


    @Override
    public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

        ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
        Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();

        // Remove previous Timer trigger. else it will invoke twice.
        if (previousTriggerTime != null) {
            ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
            System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
        }

        // register new trigger for current InputPOJO.      
        long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
        ctx.registerProcessingTimeTimer(currentTriggerTime);

        // Update currentTriggerTime in previousTriggerState.
        previousTriggerTimeState.add(currentTriggerTime);

        return TriggerResult.CONTINUE;
    }

    ...
}

在自定义触发器中,我为每个新 InputPOJO 注册一个新计时器。当我注册定时器时,我正在删除前一个定时器(基于previousTimerTriggerTime,保存在缩减状态)。

我正在使用以下代码段测试计时器计数(连同窗口)。

private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;

    TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?

    ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * <pre>
     *
     *      DataStream<OutputPOJO> OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * </pre>
     */
    customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

    WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
        operator =
            new WindowOperator<>(
                // setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
                ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
                new TimeWindow.Serializer(),
                // setting .keyBy(InputPOJO::getKey)
                keySelector,
                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                stateDesc,
                // setting  .process(new CustomWindowFunction(windowListStateTtlMillis))
                new InternalIterableProcessWindowFunction<>(CustomWindowFunction),
                // setting .trigger(new CustomTrigger(allowedLateness))
                new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
                0,
                null);

    // Creating testHarness for window operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

    // Setup and Open  Test Harness
    testHarness.setup();

    testHarness.open();
}


@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {

    int allowedLatenessSeconds = 3;
    int listStateTTL = 10;

    //1. Arrange
    InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
    InputPOJO listStateInput2 = new InputPOJO(2,"Arun");


    // 2. Act
    // listStateInput1 comes at 1 sec
    testHarness.setProcessingTime(secondsToMillis(1));
    testHarness.processElement(new StreamRecord<>(listStateInput1));

    // listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
    testHarness.setProcessingTime(secondsToMillis(2));
    testHarness.processElement(new StreamRecord<>(listStateInput1));

    // Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer. 
    // Actual: listStateInput2 registered a new timer and the total count is 3.
    // NOTE: 
    // 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
    // 2. Second timer should be the InputPOJO registered timer.
     Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS

}

在这里,函数 ,ctx.deleteProcessingTimeTimer(previousTriggerTime);被触发。但是 testHarness 中的 timerCount 仍然显示为 3。

  1. 它是 testHarness 中的错误吗?

  2. 请提供一种使用 testHarness 测试计时器计数的方法。

PS:

  1. 尽管这看起来像是 SessionWindow.Gap() 的典型功能,但我在复杂的计算中使用了这个自定义触发器。为简单起见,我将逻辑简化为上述内容。

  2. 我在为 testHarnessListStateDescriptor创建时使用。WindowOperator

标签: javaapache-flinkflink-streaming

解决方案


推荐阅读