首页 > 解决方案 > 如何使用 Kafka Streams 统计在特定时间段内生成事件的用户?

问题描述

我有其中包含用户 ID 的流媒体事件。我想计算在特定时间内有多少不同的用户生成事件。但是,我是 Kafka 的初学者,我无法解决这个问题。

1 分钟内的示例事件;

{"event_name": "viewProduct", "user_id": "12"}
{"event_name": "viewProductDetails", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "12"}
{"event_name": "viewProduct", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "32"}

根据上述事件,我的代码应该生成 3 个活跃用户。

我的方法如下,但是此解决方案无法消除来自同一用户的多个事件并多次计算同一用户。

builder.stream("orders") // read from orders toic
                .mapValues(v -> { // get user_id via json parser
                    JsonNode jsonNode = null;
                    try {
                        jsonNode = objectMapper.readTree((String) v);
                        return jsonNode.get("user_id").asText();
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                        return "";
                    })
                .selectKey((k, v) -> "1") // put same key to every user_id
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(1))) // use time windows
                .count() // count values

标签: apache-kafkaapache-kafka-streams

解决方案


我可能会在这里遗漏一些东西,你为什么不这样做:

.selectKey((k, v) -> v)

这将按值对记录进行分组,您之前使用user_id.


推荐阅读