首页 > 解决方案 > Kafka Streams聚合加法器运算符问题

问题描述

这是我的 Kafka Streams 代码:

KTable<String, Long> bankTransactionsStream = streamsIn
            .groupByKey()
            .aggregate(
                    ()-> 0L,
                    (aggKey, newValue, aggValue) -> aggValue + newValue,
                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
                            withValueSerde(Serdes.String())
            );

我通过键对流进行分组,然后我想聚合以将流的每个新值 ( newValue) 添加到聚合值 ( aggValue)。我正在关注官方的 confluent 文档,但突出显示了两个错误,它们都与提供的 VR 数据类型有关:

  1. 在加法器上(aggKey, newValue, aggValue) -> aggValue + newValue必需类型:KTable <String,Long> 提供:KTable<String,VR> 不存在类型变量的实例,因此 Long 符合字符串推理变量 VR 具有不兼容的边界:等式约束:字符串下限:长
  2. 在初始化器 ( ()-> 0L) 上:必需类型:KTable <String,Long>,提供:KTable <String, VR> 不兼容的等式约束:KeyValueStore<Bytes, byte[]> 和 StateStore

我不明白为什么它假设我提供 VR 类型,有什么想法吗?

标签: apache-kafkaapache-kafka-streamsjava-11

解决方案


推荐阅读