首页 > 解决方案 > nullpointer exception in aggregate operation in kafka stream

问题描述

below is code snippet,

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP:port"); 
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

StreamBuilder builder = new StreamBuilder(); 

KStream streamData = builder.stream(inputTopicName);

streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) 
    .aggregate( //some transformation );

KafkaStreams kafkaStreams = new KafkaStreams(
    builder.build(streamConfiguration), 
    streamConfiguration
);

here we are not using session window and this snippet gives me perfect result. But when I introduce session window with this stream then it gives null pointer exception for aggregate function.

can anybody help here

标签: apache-kafka-streams

解决方案


推荐阅读