apache-kafka - KGroupedStream 与 cogroup、聚合和抑制
问题描述
试图联合、聚合和抑制流,这会导致 -
Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier (org.apache.kafka.streams.kstream.internals.PassThrough and org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in unnamed module of loader 'app')
val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
val streams2 = confirmationStreams
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.confirmationsSerde()))
val cogrouped = stream1.cogroup(notificationAggregator).cogroup(streams2, confirmationsAggregator)
.windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong())))
.aggregate({ null }, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("time-windowed-aggregated-stream-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
但是,如果我删除它,.suppress
它工作正常。
解决方案
推荐阅读
- java - Jersey 客户端如何在不引起内存问题的情况下处理巨大的负载?
- java - 网络爬虫与 Html 解析器
- r - 仅在 r 中替换第一列中的特殊字符
- kubernetes - 如何在 App Engine 或 Kubernetes 上运行 NLTK?
- c# - 天蓝色函数 queueTrigger 错误 - Microsoft Azure WebJobs SDK '[Hidden Credential]' 连接字符串丢失或为空
- javascript - 如何使用链接打开引导选项卡?
- dll - CAD 程序“FiDES”无法在笔记本电脑上启动,出现 0xc000007b 错误并丢失 .dll 文件
- if-statement - 谷歌脚本 .getvalue() 不能处理带有公式的单元格
- maven - 运行 mvn test -Dtest=.. 时找不到文件
- docker - 在 cpanel 上运行 docker 容器