java - 定期统计 Kafka 流中的消息/事件数
问题描述
我通过使用来自一个 Kafka 主题的消息创建了一个 Kafka 流。我想计算我在1 分钟级别收到的消息数量是多少。
假设,我通过以下方式收到了消息:
t1 -> message1
t1 -> message2
t1 -> message3
1分钟后,我收到这样的消息
t2 -> message4
t2 -> message5
假设count
我的 Java 应用程序中有一个整数变量。我想要的是从应用程序开始到 1 分钟,这个计数值应该是 3。在第二分钟结束时,这个计数变量应该变成 2。这是因为在第一分钟我收到了 3 条消息并且在第二分钟我收到了 2 条消息。
到目前为止我的代码
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class CountMessage {
private static KafkaStreams kafkaStreams;
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_first_count_2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.43:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class);
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// consuming stream
String kafkaTopic = "my_kafka_topic_2";
System.out.println("Starting the application");
KStream<String, String> myStream = streamsBuilder
.stream(kafkaTopic);
myStream.foreach(new ForeachAction<String, String>() {
@SneakyThrows
@Override
public void apply(String key, String value) {
System.out.println("key received = " + key + "---<<<" + value);
}
});
final Topology topology = streamsBuilder.build();
kafkaStreams = new KafkaStreams(topology, props);
kafkaStreams.start();
}
}
解决方案
不确定您是否与使用 Kafka Streams 相关,但您可以使用 ksqlDB 做到这一点:
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss') AS TS,
COUNT(*) AS MSG_COUNT
FROM SRC_STREAM
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY 'X'
EMIT CHANGES;
推荐阅读
- reactjs - Powershell中的Webpack脚本错误但不是CMD.exe
- excel - Calculate Avg Price, Realized gain & Unrealized gain via UDF using FIFO method
- java - System.nanoTime() 的 Java 时代?
- dart - 错误:参数类型“双?” 不能分配给参数类型“双”。镖
- json - Metricbeat http模块嵌套json
- node.js - 纱线构建不工作,构建失败,因为进程退出太早
- r - 从异构日期列中解析日期
- javascript - pm2 的 env-cmd 似乎有问题
- android - 如何在android视图中覆盖样式的属性
- stripe-payments - 将资金转移到条带用户卡