首页 > 解决方案 > 在 Java 中使用 Kafka 流创建差异流很热门?

问题描述

我正在尝试从 Kafka Java 中的 KStream 创建一个“差异”流。

我有一个输入流,其中的值是一组 Doubles V0 … Vn。输出流应计算 V0 - 0、V1 - V0、V2 - V1 ... Vn -Vn-1 之间的差异。

我的第一个想法是做这样的事情:

    KStream<String, Double> stream = builder.stream(TOPIC)

    KTable<String, Double> difference = stream.groupByKey().reduce(
            (oldValue, newValue) -> {
              return newValue - oldValue
            }
    ).toStream()

假设我有一个具有以下值的 KStream 输入:

Key  -> Value
"A1" -> 2 
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13 
"A1" -> 7

我想使用以下值创建一个新的 Stream 输出:

Key  -> Value
"A1" ->  2  (2-0  =  2) 
"B2" ->  4  (4-0  =  4)
"A1" ->  4  (6-2  =  4)
"A1" ->  4  (10-6 =  4)
"B2" ->  9  (13-4 =  9)
"A1" -> -3  (7-10 = -3)

标签: javaapache-kafkastream

解决方案


你可以使用类似的东西

        stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {

        @Override
        public Diff apply(String key, Double newValue, Diff aggregate) {
            Double difference = newValue - aggregate.getLastValue();
            aggregate.setDifference(difference);
            aggregate.setLastValue(newValue);
            return aggregate;
        }
        }).mapValues(new ValueMapper<Diff, Double>() {

        @Override
        public Double apply(Diff value) {
            return value.getDifference();
        }

    }).toStream().to("diff");

在哪里

public class Diff {

  private Double lastValue = 0d;

  private Double difference = 0d;
  //getters and setters
  // ...
}

推荐阅读