首页 > 解决方案 > Kafka Streams物化商店构建错误

问题描述

我正在尝试在这里构建 Materialized.as DSL 代码:https ://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html

但我得到了错误

incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>

在线上

.withKeySerde(Serdes.Long())

有谁知道这里可能出了什么问题?

final StreamsBuilder builder = new StreamsBuilder();

   KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
   KTable<Long,String> dataStore = builder.table(
     "example_stream",
     Materialized.as(storeSupplier)
             .withKeySerde(Serdes.Long())
             .withValueSerde(Serdes.String()));

标签: javaapache-kafkaapache-kafka-streams

解决方案


问题是builder.table不知道泛型类型默认为<Object,Object>. 后来,Serde 类型不匹配。您需要指定类型,例如

KTable<Long,String> dataStore = builder.<Long,String>table(
    "example_stream",
    Materialized.as(storeSupplier)
        .withKeySerde(Serdes.Long())
        .withValueSerde(Serdes.String()));

推荐阅读