apache-kafka - Apache Kafka 分组两次
问题描述
我正在编写一个应用程序,我试图计算每小时访问一个页面的用户数量。我正在尝试过滤到特定事件,按 userId 和事件小时时间分组,然后按小时分组以获取用户数。但是在尝试关闭流时,对 KTable 进行分组会导致过多的 CPU 消耗和锁定。有一个更好的方法吗?
events
.groupBy(...)
.aggregate(...)
.groupBy(...);
.count();
解决方案
鉴于上述问题的答案“我只想在一个小时的时间窗口内知道执行特定操作的用户数量”,我会建议以下内容。
假设您有这样的记录:
class ActionRecord {
String actionType;
String user;
}
您可以定义一个聚合类,如下所示:
class ActionRecordAggregate {
private Set<String> users = new HashSet<>();
public void add(ActionRecord rec) {
users.add(rec.getUser());
}
public int count() {
return users.size();
}
}
然后您的流媒体应用程序可以:
- 接受事件
- 根据事件类型重新设置密钥(
.map()
) - 按事件类型分组 (
.groupByKey()
) - 按时间窗口(选择 1 分钟但 YMMV)
- 将它们聚合成
ActionRecordAggregate
- 将它们具体化为 StateStore
所以这看起来像:
stream()
.map((key, val) -> KeyValue.pair(val.actionType, val))
.groupByKey()
.windowedBy(TimeWindows.of(60*1000))
.aggregate(
ActionRecordAggregate::new,
(key, value, agg) -> agg.add(value),
Materialized
.<String, ActionRecordAggregate, WindowStore<Bytes, byte[]>>as("actionTypeLookup")
.withValueSerde(getSerdeForActionRecordAggregate())
);
然后,要取回事件,您可以查询您的状态存储:
ReadOnlyWindowStore<String, ActionRecordAggregate> store =
streams.store("actionTypeLookup", QueryableStoreTypes.windowStore());
WindowStoreIterator<ActionRecordAggregate> wIt =
store.fetch("actionTypeToGet", startTimestamp, endTimestamp);
int totalCount = 0;
while(wIt.hasNext()) {
totalCount += wIt.next().count();
}
// totalCount is the number of distinct users in your
// time interval that raised action type "actionTypeToGet"
希望这可以帮助!
推荐阅读
- assembly - 将 4 个字节同时加载到寄存器中的 mips 命令是什么?
- node.js - spawn ffmpeg 进程因权限被拒绝而失败
- wpf - 顶级滚动查看器中的嵌套用户控件不会导致滚动条
- c - 为什么 *str1 和 *(&str1),str 是 C 中 char 数组的名称,不会得到相同的结果?
- python - 列表中语法正确的人类可读字符串(使用牛津逗号)
- openlayers - OL:更改投影(使用 Proj4)不显示 geojson 层(尽管 setVisible 为真)
- visual-studio - Visual Studio Profiler 将“[broken]”显示为函数名称
- python - 将 Python 脚本转换为 PowerShell
- python - 如何获取熊猫系列中值的索引
- node.js - 使用 Node.JS Readline:“TypeError: rl is not async iterable”