首页 > 解决方案 > Flink 事件时间计时器不会在 testharness 中触发

问题描述

我正在尝试使用 flink 实现基于事件时间特征的翻滚窗口计算。

因此我有一个KeyedBroadcastProcessFunction做所有的工作。使用 ProcessTimers,一切都在单元测试中按预期工作。现在我更改了代码以使用事件计时器,但时间不会触发。(我确实用 registerEventTimeTimer 注册了定时器)

基本上测试是这样的

  @Test
  public void evaluateFormular_ShouldSumOnTimer() throws Exception {
    long minutesToWait = 1;
    var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
        "1", minutesToWait);

    var message = new CalculationControlMessage();
    message.setAction(ControlMessageAction.Create);
    message.setCalculationDefinition(definition);

    harness.processBroadcastElement(message, 100l);
    //harness.processBroadcastWatermark(5l);
    this.processValues(harness, values);
    // advance the watermark so that the timer can fire
    harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);

    assertEquals(harness.numEventTimeTimers(), 1);
    assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
    harness.extractOutputValues().forEach(datapoint -> {
      assertEquals(datapoint.getValue(), expected, 0d);
    });
  }

据我了解,水印是手动高级的,以便计时器可以触发。水印如果绝对高于事件的水印。

线束是这样设置的

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;

public class FlinkHarness {
  public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
      throws Exception {
    var dynamicCalculationFunction = new DynamicCalculationFunction();
    
    var harness = ProcessFunctionTestHarnesses
        .forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
          private static final long serialVersionUID = 1337L;

          @Override
          public String getKey(DataPointEvent value) throws Exception {
            return value.getDataPointKey();
          }

        }, Types.STRING, CalculationDescriptors.calculation);

    harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
    harness.getExecutionConfig().setAutoWatermarkInterval(50l);
    harness.open();
    return harness;
  }

如果我调试该函数,我可以看到测试使用的 timerServicecurrentWatermark仍然在-9223372036854775808

我不明白为什么计时器不触发。我错过了什么吗?

标签: javaapache-flink

解决方案


您没有共享足够的代码来调试它,但我可以指出一些事情:

this.processValues(harness, values);

这些值 StreamRecords 是否已分配时间戳?

harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);

水印是绝对时间戳。这将是一个非常小的水印;不大于零。除非您的时间戳从零开始,否则这是行不通的。

assertEquals(harness.numEventTimeTimers(), 1);

通过水印发送来触发定时器应该会减少定时器的数量。为什么这不是零?


推荐阅读