json - Kafka Streams API GroupBy 行为
问题描述
我是 kafka 流的新手,我正在尝试使用 groupBy 函数将一些流数据聚合到 KTable 中。问题如下:
生成的消息是一个 json msg,格式如下:
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
我想隔离 json 字段"after",然后用"key" = "ID"创建一个 KTable并为整个 json "after"赋值。
首先,我创建了一个 KStream 来隔离“after” json,它工作正常。
KStream 代码块:(不要注意if 语句,因为“before”和“after”格式相同。)
KStream<String, String> s_order_list = s_order
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
});
正如预期的那样,输出如下:
...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...
之后,我实现了一个 KTable 到 groupBy json的“ID”。
KTable 代码块:
KTable<String, String> s_table = s_order_list
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getString("ID");
});
我想创建一个错误,KTable<String, String>
但我正在创建GroupedStream<Object,String>
.
Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>
总之,问题是 KGroupedStreams 到底是什么以及如何正确实现 KTable?
解决方案
在groupBy处理器之后,您可以使用有状态处理器,例如聚合或减少(处理器返回 KTable)。你可以这样做:
KGroupedStream<String, String> s_table = s_order_list
.groupBy((key, value) ->
new JSONObject(value).getString("ID"),
Grouped.with(
Serdes.String(),
Serdes.String())
);
KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
(StringAggregate::new),
(key, value, aggregate) -> aggregate.addElement(value));
StringAggregate 看起来像:
public class StringAggregate {
private static List<String> elements = new ArrayList<>();
public StringAggregate addElement(String element){
elements.add(element);
return this;
}
//other methods
}
推荐阅读
- python - 如何从有条件的字典列表中提取字典
- c++ - 如何在 Qt 中旋转 QGraphicsPixmapItem
- java - 使用 while 循环倒数并找到平方根
- wpf - ModernWPFUI NugetPackage 'ThemeResources/XamlControlsResources' 在当前上下文中不存在/找不到 NuGet 包
- spring - 动态 JPA 查询
- c# - 在 .NET 核心应用程序的 Docker 容器中找不到 package.json
- python - 将多个字符串转换为整数
- flutter - 为什么脚手架的背景会出现一个巨大的灰色圆圈?
- swift - Swift - 物体检测中带有设备旋转的相机
- influxdb - InfluxDB:flux 是在查询中将简单计算添加为列的唯一方法吗?