首页 > 解决方案 > KStream to KStream Join- 输出记录在窗口内没有匹配记录的情况下发布可配置的时间

问题描述

需要一些关于 KStream/KTable 使用案例的意见/帮助。

设想:

我有 2 个具有共同键的主题--requestId。

  1. input_time(requestId,StartTime)
  2. 完成时间(请求 ID,结束时间)

input_time 中的数据在时间 t1 填充,completion_time 中的数据在 t+n 填充。(n 是完成进程所用的时间)。

目标 通过加入来自主题的数据来比较请求所花费的时间,并在违反阈值时间的情况下发出警报。

可能会发生该过程可能失败并且数据可能根本无法到达completion_time主题的请求。在这种情况下,我们打算使用检查当前时间是否远远超过自开始时间以来的特定(比如说 5 秒)阈值。

  1. input_time(req1,100) completion_time(req1,104) --> 没有警报作为104-100 < 5(配置值)
  2. input_time(req2,100) completion_time(req2,108) --> 以 req2,108 为108-100 >5发出警报
  3. input_time(req3,100) completion_time 无记录--> 如果当前时间超过 105,则使用 req3,currentSysTime 作为currentSysTime - 100 > 5发出警报

已尝试的选项。 1) 尝试了 KTable-KTable 和 KStream-Kstream 外连接,但第三种情况总是失败。

    final KTable<String,Long> startTimeTable =   builder.table("input_time",Consumed.with(Serdes.String(),Serdes.Long()));
    final KTable<String,Long> completionTimeTable = builder.table("completion_time",Consumed.with(Serdes.String(),Serdes.Long()));     
    KTable<String,Long> thresholdBreached =startTimeTable .outerJoin(completionTimeTable,
            new MyValueJoiner());
    thresholdBreached.toStream().filter((k,v)->v!=null)
            .to("finalTopic",Produced.with(Serdes.String(),Serdes.Long()));

细木工

 public Long apply(Long startTime,Long endTime){

        // if input record itself is not available then we cant use any alerting.
        if (null==startTime){
            log.info("AlertValueJoiner check: the start time itself is null so returning null");
            return null;
        }
        // current processing time is the time used.
        long currentTime= System.currentTimeMillis();
        log.info("Checking startTime {} end time {} sysTime {}",startTime,endTime,currentTime);
        if(null==endTime && currentTime-startTime>5000){
            log.info("Alert:No corresponding record from file completion yet currentTime {} startTime {}"
                    ,currentTime,startTime);
            return currentTime-startTime;
        }else if(null !=endTime && endTime-startTime>5000){
            log.info("Alert: threshold breach for file completion startTime {} endTime {}"
                    ,startTime,endTime);
            return endTime-startTime;
        }
    return null;
    }

2)尝试了根据线程推荐的自定义逻辑方法 如何管理Kafka KStream到Kstream窗口连接? -- 这种方法在场景 2 和 3 中停止工作。

有没有使用 DSL 或处理器处理所有三种情况的情况?

不确定我们是否可以使用某种标点符号来监听窗口何时更改并检查当前窗口中的流记录,如果没有找到匹配的记录,则使用 systime 生成结果。?

标签: apache-kafka-streams

解决方案


由于逻辑涉及的性质,它肯定必须结合 DSL 和处理器 API 来完成。

  1. 使用自定义转换器和状态存储与配置值进行比较。(案例 1 和 2)
  2. 添加了一个基于挂钟的标点符号来处理第三种情况

推荐阅读