首页 > 解决方案 > 如何在kafka流中处理给定时间范围内与键对应的最新记录?

问题描述

说明:我只想处理键的最新唯一事件。我有 KafkaStreams<String,LocationChangeEvent> kstreams。假设我在 kafkaStreams 中得到了这些事件:

{id= "DELHI", event1},
{id= "MUMBAI", event2},
{id= "DELHI", event3},
{id= "JAIPUR", event4},
{id= "MUMBAI", event5} 

现在,我想对它们进行分组(比如在 10 分钟内),这样我在给定的时间范围内只有每个键的最新事件。

`**EXPECTED EVENTS:**{id= "DELHI", event3},
                     {id= "MUMBAI", event5},
                     {id= "JAIPUR", event4}`



 **Events output according to my code implementation :** 
            {id= "DELHI", event1},
            {id= "MUMBAI", event2},
            {id= "JAIPUR", event4} and rest are marked as duplicated.

根据下面附加的代码,我能够将第一个唯一事件推送给消费者,并在给定时间内进一步将具有相同键的所有事件标记为重复。但我不想发送第一个事件,而是我想为每个键发送最新事件,即该时间范围内每个键的最后一个事件。

Properties streamsConfiguration = this.buildKafkaProperties();
                         StreamsBuilder builder = new StreamsBuilder(); KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
                            final StoreBuilder<WindowStore<String, LocationChangeEvent>> dedupStoreBuilder = Stores.windowStoreBuilder(
                                        Stores.persistentWindowStore(storeName, retentionPeriod, numSegment, minutes, false),
                                        Serdes.String(),
                                        serde);
                                    builder.addStateStore(dedupStoreBuilder);
                                    kStream.filter((key,value)->value.getChangeId()!=null)
                                        .transformValues(() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> key), storeName)
                                        .filter((k, v) -> v != null)
                                        .foreach((requestId, object) -> {
                                                    // below function push event to consumer
                                                     this.streamKafkaFunction(object);
                                        });
                            KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
                                    streams.start();
                    }
                 private static class DeduplicationTransformer<K, V, E> implements ValueTransformerWithKey<K, V, V> {
                            private ProcessorContext context;
                            private WindowStore<E, Long> eventIdStore;
                            private final long leftDurationMs;
                            private final long rightDurationMs;
                            private final KeyValueMapper<K, V, E> idExtractor;
                    DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
                                if (maintainDurationPerEventInMs < 1) {
                                    throw new IllegalArgumentException("maintain duration per event must be >= 1");
                                }
                                leftDurationMs = maintainDurationPerEventInMs;
                                rightDurationMs = maintainDurationPerEventInMs;
                                this.idExtractor = idExtractor;
                            }
                    
                            @SuppressWarnings("unchecked")
                            @Override
                            public void init(ProcessorContext context) {
                                this.context = context;
                                eventIdStore = (WindowStore<E, Long>) 
                                context.getStateStore(storeName);
                            }
                        @Override
                            public V transform(final K key, final V value) {
                                final E eventId = idExtractor.apply(key, value);
                                LOGGER.info("Event is : {}", eventId);
                                if (eventId == null) {
                                    return value;
                                } else {
                                    final V output;
                                    if (isDuplicate(eventId)) {
                                        output = null;
                                    } else {
                                        output = value;
                                        rememberNewEvent(eventId, context.timestamp());
                                    }
                                    return output;
                                }
                            }
                    private boolean isDuplicate(final E eventId) {
                                final long eventTime = context.timestamp();
                                final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
                                    eventId,
                                    eventTime - leftDurationMs,
                                    eventTime + rightDurationMs);
                                final boolean isDuplicate = timeIterator.hasNext();
                                timeIterator.close();
                                return isDuplicate;
                            }
    
                            private void rememberNewEvent(final E eventId, final long timestamp) {
                                eventIdStore.put(eventId, timestamp, timestamp);
                            }
    
                    @Override
                            public void close() {
                            }
                        } 

标签: apache-kafkakafka-consumer-apiapache-kafka-streamsspring-kafkakafka-producer-api

解决方案


这是一个简单的示例,展示了如何使用 Windowded Aggregationsuppress仅在窗口关闭时才为每个键发出最后接收到的值;

        var builder = new StreamsBuilder();
        builder.stream("my-input topic")
               .groupByKey()
               .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
               .reduce((value1, value2) -> value2)
               .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
               .toStream()
               .to("my-output-topic"); 

注意:如果您必须容忍无序事件,您应该考虑使用宽限期,即使用 TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ofMinutes(1))


推荐阅读