java - flink 1.8.2 中未删除 testHarness 计时器
问题描述
我正在使用 testHarness 来测试我的自定义触发器。简化的片段附在下面:
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {
private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);
private final long allowedLatenessMillis;
public CustomTrigger(long allowedLatenessMillis) {
this.allowedLatenessMillis = allowedLatenessMillis;
}
@Override
public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();
// Remove previous Timer trigger. else it will invoke twice.
if (previousTriggerTime != null) {
ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
}
// register new trigger for current InputPOJO.
long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
ctx.registerProcessingTimeTimer(currentTriggerTime);
// Update currentTriggerTime in previousTriggerState.
previousTriggerTimeState.add(currentTriggerTime);
return TriggerResult.CONTINUE;
}
...
}
在自定义触发器中,我为每个新 InputPOJO 注册一个新计时器。当我注册定时器时,我正在删除前一个定时器(基于previousTimerTriggerTime,保存在缩减状态)。
我正在使用以下代码段测试计时器计数(连同窗口)。
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;
TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?
ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
* <pre>
*
* DataStream<OutputPOJO> OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
* </pre>
*/
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
// setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
// setting .keyBy(InputPOJO::getKey)
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
// setting .process(new CustomWindowFunction(windowListStateTtlMillis))
new InternalIterableProcessWindowFunction<>(CustomWindowFunction),
// setting .trigger(new CustomTrigger(allowedLateness))
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
// Creating testHarness for window operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
// Setup and Open Test Harness
testHarness.setup();
testHarness.open();
}
@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
//1. Arrange
InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
InputPOJO listStateInput2 = new InputPOJO(2,"Arun");
// 2. Act
// listStateInput1 comes at 1 sec
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
testHarness.setProcessingTime(secondsToMillis(2));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer.
// Actual: listStateInput2 registered a new timer and the total count is 3.
// NOTE:
// 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
// 2. Second timer should be the InputPOJO registered timer.
Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS
}
在这里,函数 ,ctx.deleteProcessingTimeTimer(previousTriggerTime);
被触发。但是 testHarness 中的 timerCount 仍然显示为 3。
它是 testHarness 中的错误吗?
请提供一种使用 testHarness 测试计时器计数的方法。
PS:
尽管这看起来像是 SessionWindow.Gap() 的典型功能,但我在复杂的计算中使用了这个自定义触发器。为简单起见,我将逻辑简化为上述内容。
我在为 testHarness
ListStateDescriptor
创建时使用。WindowOperator
解决方案
推荐阅读
- python - 如何将现有数据帧中的值附加到 for 循环中的空数据帧?
- c# - 找不到 HtmlSelect 的方法名称
- c# - 在 Unity3D 中结合 Vuforia 地平面和模型目标
- javascript - 处理承诺:如何从 for 循环中返回一个包含承诺的列表?
- database - 无法更新表临时表,因为它没有副本标识并在 Postgres 中发布更新
- c++ - 通过重新排列其字母使回文字符串成为非回文字符串
- ajax - 在 AWS EC2 中托管时,Laravel Ajax 会导致 419 未知状态响应
- playframework-2.6 - 消息文件没有选择单引号
- javascript - 字体真棒仅适用于 chrome 但不适用于 safari
- jenkins - 如何在从机而不是主机上执行 Jenkins 共享库函数?