首页 > 解决方案 > 定期统计 Kafka 流中的消息/事件数

问题描述

我通过使用来自一个 Kafka 主题的消息创建了一个 Kafka 流。我想计算我在1 分钟级别收到的消息数量是多少。

假设,我通过以下方式收到了消息:

t1 -> message1
t1 -> message2
t1 -> message3

1分钟后,我收到这样的消息

t2 -> message4
t2 -> message5

假设count我的 Java 应用程序中有一个整数变量。我想要的是从应用程序开始到 1 分钟,这个计数值应该是 3。在第二分钟结束时,这个计数变量应该变成 2。这是因为在第一分钟我收到了 3 条消息并且在第二分钟我收到了 2 条消息。

到目前为止我的代码

import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class CountMessage {
    private static KafkaStreams kafkaStreams;

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_first_count_2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.43:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);

        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        

        // consuming stream
        String kafkaTopic = "my_kafka_topic_2";

        System.out.println("Starting the application");
        KStream<String, String> myStream = streamsBuilder
                .stream(kafkaTopic);

        myStream.foreach(new ForeachAction<String, String>() {
            @SneakyThrows
            @Override
            public void apply(String key, String value) {
                System.out.println("key received = " + key + "---<<<" + value);
            }
        });

        final Topology topology = streamsBuilder.build();
        kafkaStreams = new KafkaStreams(topology, props);
        kafkaStreams.start();
    }
}

标签: javaapache-kafkaapache-kafka-streams

解决方案


不确定您是否与使用 Kafka Streams 相关,但您可以使用 ksqlDB 做到这一点:

SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss') AS TS, 
       COUNT(*) AS MSG_COUNT 
FROM   SRC_STREAM 
         WINDOW TUMBLING (SIZE 1 MINUTE) 
GROUP BY 'X' 
EMIT CHANGES;

推荐阅读