首页 > 解决方案 > Kafka Stream 聚合期间的异常是什么意思?

问题描述

我正在 Kafka v1.0 Streams 中编写一个应用程序,当我尝试运行该应用程序时遇到了一个奇怪的运行时问题。简化后的应用程序如下所示:

KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable = 
    groupedStream
    .aggregate( ()-> new ValueClass(),  // initializer
                ( string, invalue, aggvalue ) -> { 
                     ValueClass aggResult = f( invalue, aggvalue );
                     return aggResult; }, // aggregator
                Materialized.with( Serdes.String(), ValueClassSerde )
              );

我已经通过print( Printed.toSysOut() )操作确认输入 KStream 看起来像我预期的那样,并且添加groupByKey()应用程序仍然有效,但是当我添加聚合操作时,出现运行时错误:

java.lang.ClassCastException: java.lang.String cannot be cast to ValueClass

我已经多次检查该程序,但它对我来说仍然是合理的,我无法弄清楚处理中的什么试图将 aString转换为ValueClass.

你能解释一下错误消息告诉我什么,以及我需要做什么来解决这个问题吗?


下一个问题:我修改了上面的代码,添加了一个将聚合输出表转换为流并打印的步骤:

KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable = 
    groupedStream
    .aggregate( ()-> new ValueClass(),  // initializer
                ( string, invalue, aggvalue ) -> { 
                     ValueClass aggResult = f( invalue, aggvalue );
                     return aggResult; }, // aggregator
                Materialized.with( Serdes.String(), ValueClassSerde )
              );
aggregatedTable.toStream().print( Printed.toSysOut() );

程序运行,但没有任何结果。此外,当我在 Eclipse 调试器下运行并在聚合函数中放置断点时f,执行永远不会遇到断点。

我读过一些关于 KTable.toStream 不输出每个结果以节省流流量的内容;这就是这里发生的事情,有什么办法可以防止它立即刷新输出?

标签: apache-kafka-streams

解决方案


推荐阅读