首页 > 解决方案 > 不正确的结果 Kstream-Kstream Join with asymmetric time window

问题描述

我有 2 个名为“警报”和“干预”的流,其中包含 JSON。如果连接了警报和干预,则它们将具有相同的键。我想联系他们以检测所有在 24 小时前未进行干预的警报。
但是这个程序不起作用,结果给了我所有的警报,就好像 24 小时前没有进行任何干预一样。我重新检查了我的数据集 5 次,有些警报在警报日期前不到 24 小时内完成了干预。
这张图片说明了情况: 在此处输入图片描述
所以我需要知道在警报之前是否有干预。
程序代码:

    final KStream<String, JsonNode> alarm = ...;

    final KStream<String, JsonNode> intervention = ...;

    final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);

    final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
        return value != null;
    }).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode actualObj = null;

        if (rightValue == null) {//No intervention before
            try {
                actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
                        + "\"alarm\":" + leftValue.toString()
                        + "}");
            } catch (IOException ex) {
                Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
            }
            return actualObj;
        } else {
            return null;
        }
    }, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));

    final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
        return value != null;
    });

    fraude.foreach((key, value) -> {
        rl.println("Fraude=" + key + " => " + value);
        System.out.println("Fraude=" + key + " => " + value);
    });

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);

    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            streams.close();
            rl.close();
            el.close();
            nfl.close();
        }
    }));

综上所述,我想检测红色矩形中的图案在此处输入图像描述

PS:我确保在报警记录之前发送干预记录

标签: apache-kafkaleft-joinapache-kafka-streams

解决方案


M.Djx,

我认为现在 Kafka Streams 中的这个用例没有完美的解决方案,但我有一些想法可以让你更接近。我正准备在不久的将来提交一个 KIP 来解决这样的用例。

一点:与 KTable 不同,KStreams 不是变更日志,因此较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一流中。我认为这就是为什么您foreach使所有警报看起来都没有干预的原因;您会看到干预之前的中间加入事件。

例如:

LEFT   RIGHT    JOIN
a:1             a:(1,null)
       a:X      a:(1,X)

foreach将在两个连接结果上调用,看起来好像缺少正确的值,而实际上只是有点晚了。

如果您在结果流上应用时间窗口,您获得一个变更日志——较新的值将覆盖较旧的值。就像是:

joinedAI
  .groupByKey()
  .windowedBy(
      TimeWindows
          .of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
          .until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
  ).reduce(
      new Reducer<JsonNode>() {
          @Override
          public Long apply(final JsonNode value1, final JsonNode value2) {
              return value2;
          }
      },
      Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
  );

令人遗憾的是,这将生成一个具有正确语义的变更日志流,但您仍会看到中间值,因此您也不希望直接从该流触发任何操作(例如foreach)。

您可以做的一件事是安排一项工作,每天一次,从昨天"alerts-without-interventions"开始扫描窗口。您从窗口存储中获得的任何结果都将是该键的最新值。

我正在准备的 KIP 将提出一种方法,让您从窗口中过滤掉中间结果,这将让您将 foreach 附加到更改日志并让它仅在窗口的最终结果上触发。

或者,如果您的应用程序的数据不是太大,并且您不太担心边缘情况,您可以考虑使用 LinkedHashMap 或 Guava 缓存自己实现“窗口最终事件”语义。

我希望这有帮助。


推荐阅读