首页 > 解决方案 > KGroupedTable 的计数可以是负数吗?

问题描述

我的代码在 KTable 上应用 groupBy,然后是计数:

KStream<AggregationFields, Long> theCounts = theTable
            .groupBy((key, value) -> {
                AggregationFields af = new AggregationFields(
                        value.getUser(),
                        value.getGroup(),
                        value.getSegment);

                return KeyValue.pair(af, 1L);
            }, Serialized.with(AggregationFields.getSerde(), Serdes.Long()))
            .count()
            .toStream();

在我的生产环境中,我有时会在启动此应用程序时看到计数产生负数,即使我使用应用程序重置工具来确保没有遗留内部主题,以及删除任何本地流状态。是否存在计数可能为负的情况?我做错了吗?

我在 kafka-streams 1.0.1 上(但是,服务器运行的是 1.0 之前的版本,不确定这是否重要)。

标签: apache-kafka-streams

解决方案


每次更新基表时,Kafka Streams 需要向下游发送两条记录来更新计数,因为一般情况下,如果有多个分区,两条更新记录可能会在不同的机器上处理。一个记录是“负”减法记录,第二个记录是潜在不同键的计数的“正”加法记录。

如果对基表的更新没有导致 count() 的键更改,则两条记录将依次处理,如果当前计数为零,我们将在处理减法记录时首先将计数减一,然后之后再次增加计数。对于这种特殊情况,您可能会看到一个否定的中间结果。


推荐阅读