java - Kafka 流和窗口化以在时间窗口内保持计数
问题描述
我是 Stackoverflow 的新手,如果问题被问得不好,请原谅我。非常感谢任何帮助/灵感!
我正在使用 Kafka 流将传入数据过滤到我的数据库中。传入的消息看起来像{"ID":"X","time":"HH:MM"}
和其他一些参数,在这种情况下无关紧要。我设法让一个 java 应用程序运行,它从一个主题中读取并打印出传入的消息。现在我想做的是使用 KTables(?) 对具有相同 ID 的传入消息进行分组,然后使用会话窗口将表分组到时隙中。我想要在时间轴上连续运行 X 分钟的时间窗口。
首先当然是让 KTable 运行以计算具有相同 ID 的传入消息。我想做的应该是这样的:
ID Count
X 1
Y 3
Z 1
不断更新,因此从表中删除具有过时时间戳的消息。
我不是百分百肯定,但我认为我想要的是 KTables 而不是 KStreams,对吗?如果这是实现我想要的结果的正确方法,我如何实现滑动窗口?
这是我现在使用的代码。它仅从主题中读取并打印传入的消息。
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
我想知道我必须添加什么来实现我想要的上述输出,以及我把它放在我的代码中的什么位置。
在此先感谢您的帮助!
解决方案
是的,您需要 Ktable 和滑动窗口,我还建议您查看水印功能,以处理延迟传递消息。 例子
KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream
.groupByKey()
.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
//where your adder can be as simple as (val, agg) -> agg + val
//for primitive types or as complex as you need
推荐阅读
- ajax - 在 Chrome 浏览器中加载 iframe 内容
- java - 为什么返回的通用列表在 intellij idea 中是合法的
- xamarin - Xamarin Android:导航单击侦听器停止处理方向更改
- c# - 复选框选中的更改将禁用另一个复选框
- jquery - Bootstrap 4,如何在从容器 1 中选择时显示容器 2 中的列表组项目
- javascript - 如何在 Angular 中将标签与它们之外的表单字段相关联?
- authentication - 如何使路由仅在登录 asp.net core 2.1 后可用?
- javascript - 为什么放大和缩小第一次在 React js 中不起作用?
- php - 简化 PHP 中很长的 if 语句
- java - Java 抛出异常:此站点包含在另一个站点中:“/”