首页 > 解决方案 > 使用 TopologyTestDriver 时无法强制窗口抑制

问题描述

有问题的拓扑:

builder.<String, String>stream(someTopic)
            .filter((k, v) -> !k.equals("heartbeat"))
            .filter((k, v) -> v != null)
            .filter(this::isRedactedInstance)
            // update our current cache of "known" redacted records
            .peek(this::updateRedactedCache)
            // change key so we can properly do the join further down
            .selectKey(this::getKeyFromRedacted)
            // now we slice the stream into 10 sec long slices and aggregate by key
            // Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -> null, this::aggregateRedactedByKey)
            // the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme)

用于测试的单元测试是:

TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));

1)当我在调试模式下运行测试时,在“deleteMe”方法处有一个断点,它永远不会被命中。

2)当我在“聚合”方法中添加断点时,它们按预期命中(三次)

3)如果我足够慢地通过测试,我确实会在“deleteMe”方法中遇到断点。

我曾尝试推进挂钟,但我知道这与窗口抑制无关(此外它也不起作用)。

我不知道还有什么可以尝试的——我本来预计第三个事件,具有超长的时间戳,会触发抑制。

标签: javaapache-kafkaapache-kafka-streams

解决方案


推荐阅读