java - 在 Java 中使用 Kafka 流创建差异流很热门?
问题描述
我正在尝试从 Kafka Java 中的 KStream 创建一个“差异”流。
我有一个输入流,其中的值是一组 Doubles V0 … Vn。输出流应计算 V0 - 0、V1 - V0、V2 - V1 ... Vn -Vn-1 之间的差异。
我的第一个想法是做这样的事情:
KStream<String, Double> stream = builder.stream(TOPIC)
KTable<String, Double> difference = stream.groupByKey().reduce(
(oldValue, newValue) -> {
return newValue - oldValue
}
).toStream()
假设我有一个具有以下值的 KStream 输入:
Key -> Value
"A1" -> 2
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13
"A1" -> 7
我想使用以下值创建一个新的 Stream 输出:
Key -> Value
"A1" -> 2 (2-0 = 2)
"B2" -> 4 (4-0 = 4)
"A1" -> 4 (6-2 = 4)
"A1" -> 4 (10-6 = 4)
"B2" -> 9 (13-4 = 9)
"A1" -> -3 (7-10 = -3)
解决方案
你可以使用类似的东西
stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {
@Override
public Diff apply(String key, Double newValue, Diff aggregate) {
Double difference = newValue - aggregate.getLastValue();
aggregate.setDifference(difference);
aggregate.setLastValue(newValue);
return aggregate;
}
}).mapValues(new ValueMapper<Diff, Double>() {
@Override
public Double apply(Diff value) {
return value.getDifference();
}
}).toStream().to("diff");
在哪里
public class Diff {
private Double lastValue = 0d;
private Double difference = 0d;
//getters and setters
// ...
}
推荐阅读
- parameters - 在 HP ALM 的测试配置中设置参数值的正确方法是什么?
- scikit-learn - sklearn KNN 回归在 k=1 时没有给出 0 的训练错误
- python - 为多个变量赋值时如何迭代dict键
- c - 为什么代码停止并用指针中止执行?
- c++ - 可以使用不可复制的成员作为使对象不可复制的替代方法吗?
- javascript - 使用javascript对数组问题进行排序
- android - 音频流和获取幅度以同时绘制波形
- javascript - 为什么 Math.log(0) 在 JavaScript 中返回 -Infinity?
- android - 每次按时都无法下载 httpclient.jar 再试一次
- c++ - 复制构造函数和传递类