java - 使用 TopologyTestDriver 时无法强制窗口抑制
问题描述
有问题的拓扑:
builder.<String, String>stream(someTopic)
.filter((k, v) -> !k.equals("heartbeat"))
.filter((k, v) -> v != null)
.filter(this::isRedactedInstance)
// update our current cache of "known" redacted records
.peek(this::updateRedactedCache)
// change key so we can properly do the join further down
.selectKey(this::getKeyFromRedacted)
// now we slice the stream into 10 sec long slices and aggregate by key
// Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
.aggregate(() -> null, this::aggregateRedactedByKey)
// the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek(this::deleteme)
用于测试的单元测试是:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1),
getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2),
getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3),
getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
1)当我在调试模式下运行测试时,在“deleteMe”方法处有一个断点,它永远不会被命中。
2)当我在“聚合”方法中添加断点时,它们按预期命中(三次)
3)如果我足够慢地通过测试,我确实会在“deleteMe”方法中遇到断点。
我曾尝试推进挂钟,但我知道这与窗口抑制无关(此外它也不起作用)。
我不知道还有什么可以尝试的——我本来预计第三个事件,具有超长的时间戳,会触发抑制。
解决方案
推荐阅读
- excel - How do I manipulate databases in excel through sqlite or transfer databases from one to another (both from and to excel and sqlite)?
- c# - #Dotnet EF Core migration issue Foreign key constraint is incorrectly formed
- symfony - Symfony Messenger 4.3 - 从理论传输中消费消息失败(抛出异常)
- python - 如何使用 pandas read_gbq 防止 SQL 注入
- python - 如何在 Python 中创建一个简单的标志
- json - 验证预期值的 JSON 模式
- c - 收到错误“Main.c:9:22: error: expected ';' 在 C 编程中的“for”语句说明符中
- azure-devops - 在 Azure DevOps 或 Team City 中,什么是构建代理?
- node.js - 带有文件和正文的后控制器
- php - SQLSTATE [HY000]:一般错误:1364 字段“用户名”没有默认值