apache-kafka-streams - Kafka Stream 聚合期间的异常是什么意思?
问题描述
我正在 Kafka v1.0 Streams 中编写一个应用程序,当我尝试运行该应用程序时遇到了一个奇怪的运行时问题。简化后的应用程序如下所示:
KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable =
groupedStream
.aggregate( ()-> new ValueClass(), // initializer
( string, invalue, aggvalue ) -> {
ValueClass aggResult = f( invalue, aggvalue );
return aggResult; }, // aggregator
Materialized.with( Serdes.String(), ValueClassSerde )
);
我已经通过print( Printed.toSysOut() )
操作确认输入 KStream 看起来像我预期的那样,并且添加groupByKey()
应用程序仍然有效,但是当我添加聚合操作时,出现运行时错误:
java.lang.ClassCastException: java.lang.String cannot be cast to ValueClass
我已经多次检查该程序,但它对我来说仍然是合理的,我无法弄清楚处理中的什么试图将 aString
转换为ValueClass
.
你能解释一下错误消息告诉我什么,以及我需要做什么来解决这个问题吗?
下一个问题:我修改了上面的代码,添加了一个将聚合输出表转换为流并打印的步骤:
KStream< String, ValueClass > stream = ...;
KGroupedStream< String, ValueClass > groupedStream = stream.groupByKey();
KTable< String, ValueClass > aggregatedTable =
groupedStream
.aggregate( ()-> new ValueClass(), // initializer
( string, invalue, aggvalue ) -> {
ValueClass aggResult = f( invalue, aggvalue );
return aggResult; }, // aggregator
Materialized.with( Serdes.String(), ValueClassSerde )
);
aggregatedTable.toStream().print( Printed.toSysOut() );
程序运行,但没有任何结果。此外,当我在 Eclipse 调试器下运行并在聚合函数中放置断点时f
,执行永远不会遇到断点。
我读过一些关于 KTable.toStream 不输出每个结果以节省流流量的内容;这就是这里发生的事情,有什么办法可以防止它立即刷新输出?
解决方案
推荐阅读
- javascript - 如何排序 sapuit5 列表的顺序?
- c# - 在将通过 IDataReader 获取的数据返回给其他代码之前对其进行预处理
- excel - 跳过某些单元格时的增量编号
- c - 检查信号是否在 SIG_DFL 或 SIG_IGN 上?
- spring - Spring Data gemfire + 无可用服务器异常
- java - Camel-cxf 停止路线关闭巴士
- r - 使用正则表达式检测字母数字字符串的范围
- linux - slurm SBATCH - 多个节点,相同的 SLURMD_NODENAME
- sqlite - 整数除列 - SQLite
- google-apps-script - 如何在谷歌脚本中打印变量