apache-kafka - 如何在kafka流中处理给定时间范围内与键对应的最新记录?
问题描述
说明:我只想处理键的最新唯一事件。我有 KafkaStreams<String,LocationChangeEvent> kstreams。假设我在 kafkaStreams 中得到了这些事件:
{id= "DELHI", event1},
{id= "MUMBAI", event2},
{id= "DELHI", event3},
{id= "JAIPUR", event4},
{id= "MUMBAI", event5}
现在,我想对它们进行分组(比如在 10 分钟内),这样我在给定的时间范围内只有每个键的最新事件。
`**EXPECTED EVENTS:**{id= "DELHI", event3},
{id= "MUMBAI", event5},
{id= "JAIPUR", event4}`
**Events output according to my code implementation :**
{id= "DELHI", event1},
{id= "MUMBAI", event2},
{id= "JAIPUR", event4} and rest are marked as duplicated.
根据下面附加的代码,我能够将第一个唯一事件推送给消费者,并在给定时间内进一步将具有相同键的所有事件标记为重复。但我不想发送第一个事件,而是我想为每个键发送最新事件,即该时间范围内每个键的最后一个事件。
Properties streamsConfiguration = this.buildKafkaProperties();
StreamsBuilder builder = new StreamsBuilder(); KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
final StoreBuilder<WindowStore<String, LocationChangeEvent>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, retentionPeriod, numSegment, minutes, false),
Serdes.String(),
serde);
builder.addStateStore(dedupStoreBuilder);
kStream.filter((key,value)->value.getChangeId()!=null)
.transformValues(() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> key), storeName)
.filter((k, v) -> v != null)
.foreach((requestId, object) -> {
// below function push event to consumer
this.streamKafkaFunction(object);
});
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
}
private static class DeduplicationTransformer<K, V, E> implements ValueTransformerWithKey<K, V, V> {
private ProcessorContext context;
private WindowStore<E, Long> eventIdStore;
private final long leftDurationMs;
private final long rightDurationMs;
private final KeyValueMapper<K, V, E> idExtractor;
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs;
rightDurationMs = maintainDurationPerEventInMs;
this.idExtractor = idExtractor;
}
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
eventIdStore = (WindowStore<E, Long>)
context.getStateStore(storeName);
}
@Override
public V transform(final K key, final V value) {
final E eventId = idExtractor.apply(key, value);
LOGGER.info("Event is : {}", eventId);
if (eventId == null) {
return value;
} else {
final V output;
if (isDuplicate(eventId)) {
output = null;
} else {
output = value;
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@Override
public void close() {
}
}
解决方案
这是一个简单的示例,展示了如何使用 Windowded Aggregationsuppress
仅在窗口关闭时才为每个键发出最后接收到的值;
var builder = new StreamsBuilder();
builder.stream("my-input topic")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("my-output-topic");
注意:如果您必须容忍无序事件,您应该考虑使用宽限期,即使用 TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ofMinutes(1))
推荐阅读
- xml - XSLT 将值“替换”为通过同级文本内容匹配的另一个文件
- python - Tensorflow:可以释放GPU资源的模型包装器
- java - 将 2D 输入数组转换为非平衡 BST
- c++ - Ns3 - 如何通过 CCH 通道(wave-simple-device.cc 编辑)定期在 VANET 中发送 WSMP 数据包?
- swift - 使用组合框架的按钮操作
- python - Python 3.7 tkinter 8 - 即时更改菜单项“命令”选项
- html - 无法在导航栏 html 中显示徽标
- swift - Swift Decodable,Endpoint 返回完全不同的类型
- ios - 如何确定我的应用程序是否包含加密?
- spring-data-cassandra - 使用 spring-data-cassandra 在 cassandra 中进行分页和排序