apache-kafka - 重新平衡后的 KafkaStream stateStore 陈旧数据
问题描述
我们使用 Kafka 流来处理传入的 GPS 位置。要求如下:如果在 x 分钟内没有收到某个驱动程序的位置,则将该驱动程序标记为不可用。对于每个新位置,我们使用 driverId 作为键和实际位置作为值填充状态存储。到目前为止,一切都很好。然后我们要引入调度程序来检查状态存储并将位置超过 x 分钟的每个驱动程序标记为不可用。
我们正在使用处理器 API 来创建我们的拓扑。
StoreBuilder currentTrackabilityStateStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("driver-trackability-store"),
Serdes.String(),
driverTrackabilityStateSerde());
Topology topology = new Topology()
.addSource(partitionedDriverGpsSourceName,
Serdes.String().deserializer(),
Serdes.ByteArray().deserializer(),
MessageTopic.DRIVER_GPS_INTERNAL.getValue())
.addProcessor(processorNameRequestId,
ProcessorRequestId::new,
partitionedDriverGpsSourceName)
.addProcessor(deserializeProcessorName,
ProcessorDeserialize::new,
processorNameRequestId)
.addProcessor(trackabilityProcessorName,
()-> new ProcessorTrackability(trackabilityChangesSinkName),
deserializeProcessorName)
.addSink(trackabilityChangesSinkName,
MessageTopic.TRACKABILITY.getValue(),
Serdes.String().serializer(),
driverTrackabilityStateSerde().serializer(),
trackabilityProcessorName)
.addStateStore(currentTrackabilityStateStore, trackabilityProcessorName);
初始化调度程序
public void init(ProcessorContext context) {
this.context = context;
this.kvStore = (KeyValueStore<String, GpsInfo>) context.getStateStore("driver-trackability-store");
schedule = this.context.schedule(Duration.of(45, ChronoUnit.SECONDS), PunctuationType.WALL_CLOCK_TIME, new GpsTrackabilityPunctuator(this.kvStore, this.context, trackabilityChangesSink));
}
处理方法
public void process(String key, GpsInfo gpsInfo) {
// omitted
this.kvStore.put(key, gpsInfo);
// omitted
}
最后,还有标点符号
public void punctuate(long timestamp) {
log.info("Punctuating...");
KeyValueIterator<String, GpsInfo> iterator = this.kvStore.all(); // Problem is right here
}
问题出在 this.kvStore.all() 中,它显然在重新平衡后保留了陈旧的信息。例如,一个 id 为 10 的驱动程序将被分配到正在实例 1 上处理的分区,并且存储区将填充 y 条记录,然后发生重新平衡,现在正在实例 2 上处理驱动程序 10。状态成功迁移到实例 2,因此当计划在实例 2 上运行时,我们将拥有所有以前的位置以及即将到来的新位置。问题是调度程序仍在实例 1 上运行。然后发生的情况是没有保存新位置,调度程序将驱动程序 10 标记为不可用,而实例 2 上的调度程序将同一驱动程序标记为可用,因为它具有最新位置. 那么有没有办法在重新平衡后从状态存储中清除陈旧的记录?我错过了什么吗?
编辑1:
状态存储在启动时从更改日志主题备份。这是为每个任务完成的。每个任务都从更改日志中获取所有消息(无论分区如何)。这就是为什么 kvStore.all() 返回所有记录,而不仅仅是当前任务正在处理的记录。
解决方案
这是我的一个错误。经过数小时的调试后,我得出的结论是,该问题与源主题有关,该主题具有带键和不带键的记录。
它以某种方式扰乱了商店再平衡机制。
解决方案是删除源主题并创建一个仅包含带有键的记录的新主题。
推荐阅读
- gem5 - 如何在 gem5 中使用时间戳跟踪已执行的来宾函数符号名称?
- gradle - 带有 Cucumber Gradle 的 TestNG,无法从命令行覆盖 cucumberoptions
- c# - 二维轨迹路径与 Unity 中的对象路径不匹配
- html - 即使删除任何 HTML DOM 元素,如何保持固定布局
- java - 如何在没有 IDENTITY 的情况下从 sql server 获取生成的密钥?
- webrtc - iOS 14 Safari 和手机睡眠/解锁中的 WebRTC/getUserMedia 问题
- html - 如何将编辑图标和数据放在一个垫表中
- powershell - 如何使用 PowerShell 在 TFS 中签出文件?
- heroku - 在 Heroku 的 celery/redis 任务中使用 BERT(蒸馏)模型
- mysql - 使用mysql的多值过滤器