首页 > 解决方案 > 重新平衡后的 KafkaStream stateStore 陈旧数据

问题描述

我们使用 Kafka 流来处理传入的 GPS 位置。要求如下:如果在 x 分钟内没有收到某个驱动程序的位置,则将该驱动程序标记为不可用。对于每个新位置,我们使用 driverId 作为键和实际位置作为值填充状态存储。到目前为止,一切都很好。然后我们要引入调度程序来检查状态存储并将位置超过 x 分钟的每个驱动程序标记为不可用。

我们正在使用处理器 API 来创建我们的拓扑。

StoreBuilder currentTrackabilityStateStore = Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("driver-trackability-store"),
                    Serdes.String(),
                    driverTrackabilityStateSerde());

Topology topology = new Topology()
            .addSource(partitionedDriverGpsSourceName,
                    Serdes.String().deserializer(),
                    Serdes.ByteArray().deserializer(),
                    MessageTopic.DRIVER_GPS_INTERNAL.getValue())
            .addProcessor(processorNameRequestId,
                    ProcessorRequestId::new,
                    partitionedDriverGpsSourceName)
            .addProcessor(deserializeProcessorName,
                    ProcessorDeserialize::new,
                    processorNameRequestId)
            .addProcessor(trackabilityProcessorName,
                    ()-> new ProcessorTrackability(trackabilityChangesSinkName),
                    deserializeProcessorName)
            .addSink(trackabilityChangesSinkName,
                    MessageTopic.TRACKABILITY.getValue(),
                    Serdes.String().serializer(),
                    driverTrackabilityStateSerde().serializer(),
                    trackabilityProcessorName)
            .addStateStore(currentTrackabilityStateStore, trackabilityProcessorName);

初始化调度程序

    public void init(ProcessorContext context) {
        this.context = context;
        this.kvStore = (KeyValueStore<String, GpsInfo>) context.getStateStore("driver-trackability-store");
        schedule = this.context.schedule(Duration.of(45, ChronoUnit.SECONDS), PunctuationType.WALL_CLOCK_TIME, new GpsTrackabilityPunctuator(this.kvStore, this.context, trackabilityChangesSink));
}

处理方法

   public void process(String key, GpsInfo gpsInfo) {
       // omitted
       this.kvStore.put(key, gpsInfo);
      // omitted
   }

最后,还有标点符号

 public void punctuate(long timestamp) {
    log.info("Punctuating...");
    KeyValueIterator<String, GpsInfo> iterator = this.kvStore.all(); // Problem is right here
}

问题出在 this.kvStore.all() 中,它显然在重新平衡后保留了陈旧的信息。例如,一个 id 为 10 的驱动程序将被分配到正在实例 1 上处理的分区,并且存储区将填充 y 条记录,然后发生重新平衡,现在正在实例 2 上处理驱动程序 10。状态成功迁移到实例 2,因此当计划在实例 2 上运行时,我们将拥有所有以前的位置以及即将到来的新位置。问题是调度程序仍在实例 1 上运行。然后发生的情况是没有保存新位置,调度程序将驱动程序 10 标记为不可用,而实例 2 上的调度程序将同一驱动程序标记为可用,因为它具有最新位置. 那么有没有办法在重新平衡后从状态存储中清除陈旧的记录?我错过了什么吗?

编辑1:

状态存储在启动时从更改日志主题备份。这是为每个任务完成的。每个任务都从更改日志中获取所有消息(无论分区如何)。这就是为什么 kvStore.all() 返回所有记录,而不仅仅是当前任务正在处理的记录。

标签: apache-kafkaapache-kafka-streams

解决方案


这是我的一个错误。经过数小时的调试后,我得出的结论是,该问题与源主题有关,该主题具有带键和不带键的记录。

它以某种方式扰乱了商店再平衡机制。

解决方案是删除源主题并创建一个仅包含带有键的记录的新主题。


推荐阅读