scala - 在 KStreams 中聚合实时数据
问题描述
我想根据指定的键对一列数据求和。Stream 就像 id(String) Key, value(Long)。
val aggtimelogs: KTable[String, java.lang.Long] = stream
.groupByKey()
.aggregate(
() => 0L,
(key: String, value: java.lang.Long, aggregate: java.lang.Long) => value + aggregate)
//这里失败
得到
Unspecified value parameters: : Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]
在 Scala 中怎么做?
卡夫卡版本是
compile "org.apache.kafka:kafka-clients:2.0.0"
compile (group: "org.apache.kafka", name: "kafka-streams", version: "2.0.0"){
exclude group:"com.fasterxml.jackson.core"
}
即使我试过这个
val reducer = new Reducer[java.lang.Long]() {
def apply(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
val agg = stream
.groupByKey()
.reduce(reducer)
还有这个
val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2
说
StreamAggregation.scala:39: type mismatch;
found : (Long, Long) => Long
required: org.apache.kafka.streams.kstream.Reducer[Long]
val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2
解决方案
我是这样做的
val aggVal = streams.groupByKey().reduce(new Reducer[Double]() {
def apply(val1: Double, val2: Double): Double = val1 + val2
})
推荐阅读
- python - 用 Python 创建一个基本的二十一点游戏 - 努力分配和总卡值
- node.js - res.download() throw 请求中止
- python - python3.5中无法导入kicad的pcbnew
- r - 在ggplot堆积条形图中将标签添加到整个条形图
- python - 无法打开文件“pip”:[Errno 2] 没有这样的文件或目录
- sql - SQL 识别 Union
- sql-server - 如何在 SQL Server 数据库的多个表中查找匹配的列?
- mysql - 使用每个表的最高 id 将两个表中的信息合并为一个
- mdm - 是否有用于 Intune 的 CSP 来配置屏幕保护程序超时?
- testing - 可能与 Gherkin 相关的测试最佳实践,但可能不是