java - Flink 事件时间计时器不会在 testharness 中触发
问题描述
我正在尝试使用 flink 实现基于事件时间特征的翻滚窗口计算。
因此我有一个KeyedBroadcastProcessFunction
做所有的工作。使用 ProcessTimers,一切都在单元测试中按预期工作。现在我更改了代码以使用事件计时器,但时间不会触发。(我确实用 registerEventTimeTimer 注册了定时器)
基本上测试是这样的
@Test
public void evaluateFormular_ShouldSumOnTimer() throws Exception {
long minutesToWait = 1;
var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
"1", minutesToWait);
var message = new CalculationControlMessage();
message.setAction(ControlMessageAction.Create);
message.setCalculationDefinition(definition);
harness.processBroadcastElement(message, 100l);
//harness.processBroadcastWatermark(5l);
this.processValues(harness, values);
// advance the watermark so that the timer can fire
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);
assertEquals(harness.numEventTimeTimers(), 1);
assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
harness.extractOutputValues().forEach(datapoint -> {
assertEquals(datapoint.getValue(), expected, 0d);
});
}
据我了解,水印是手动高级的,以便计时器可以触发。水印如果绝对高于事件的水印。
线束是这样设置的
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
public class FlinkHarness {
public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
throws Exception {
var dynamicCalculationFunction = new DynamicCalculationFunction();
var harness = ProcessFunctionTestHarnesses
.forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
private static final long serialVersionUID = 1337L;
@Override
public String getKey(DataPointEvent value) throws Exception {
return value.getDataPointKey();
}
}, Types.STRING, CalculationDescriptors.calculation);
harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
harness.getExecutionConfig().setAutoWatermarkInterval(50l);
harness.open();
return harness;
}
如果我调试该函数,我可以看到测试使用的 timerServicecurrentWatermark
仍然在-9223372036854775808
我不明白为什么计时器不触发。我错过了什么吗?
解决方案
您没有共享足够的代码来调试它,但我可以指出一些事情:
this.processValues(harness, values);
这些值 StreamRecords 是否已分配时间戳?
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);
水印是绝对时间戳。这将是一个非常小的水印;不大于零。除非您的时间戳从零开始,否则这是行不通的。
assertEquals(harness.numEventTimeTimers(), 1);
通过水印发送来触发定时器应该会减少定时器的数量。为什么这不是零?
推荐阅读
- json - 如何将 2 个 json 合并为一个?我需要将它们合并为字符串吗?
- excel - 选择相对于活动单元格的多行
- python - 将 cv2 摄像头集成到 kv 中作为 Screen
- azure-logic-apps - 使用自定义 URL 公开 AS2 逻辑应用
- javascript - 如何动态更改跨度标签的颜色?
- sql - SQL:根据唯一 ID 更新数百行
- reactjs - 我正在尝试通过赛普拉斯中的多个测试来保留会话 cookie,但 Cypress.Cookie.preserveOnce() 不起作用
- ios - 与 Apple 测试用户一起登录
- python - 为什么我的打字机打印的每个字符都换行?
- scala - scala 2.13 - 编译插件时出错