java - 如何在 KStream (Kafka Streams) 中加入列表的每个元素
问题描述
例如,我有一个 KStream 的购物车,每个购物车都有一个产品 ID 列表。此外,还有一个带有产品的 KStream。我如何将它们连接在一起?
public class ShoppingCart {
List<ProductKey> productKeys;
}
public class Product {
ProductKey key;
String name;
}
public class ProductKey {
String id;
}
KStream<String, ShoppingCart> shoppingCartKStream;
KStream<ProductKey, Product> productKStream;
我想要的结果看起来像这样
KStream<String, ShoppingCartWithProducts> joinedStream;
public class ShoppingCartWithProducts {
List<Product> products;
}
有没有简单的方法来存档?
编辑:我知道有一种方法,但我觉得它太复杂了。简而言之:
我需要对 shoppingCart-KStream 进行平面映射
然后我可以将结果与产品-KStream 结合起来
对中间结果进行分组和聚合
最后加入 shoppingCart-KStream
KStream<String, ProductKey> productKeyStream = shoppingCartKStream .flatMap((key, shoppingCart) -> shoppingCart.productKeys.stream() .map(productKey -> KeyValue.pair(key, productKey)) .collect(Collectors.toList()) ); KTable<String, Product> productStreamWithShoppingCartKey = productKeyStream.toTable() .join( productKStream.toTable(), productKey -> productKey, (productKey, product) -> product ); KTable<String, ArrayList<Product>> productListStream = productStreamWithShoppingCartKey .groupBy(KeyValue::pair) .aggregate( (Initializer<ArrayList<Product>>) ArrayList::new, (key, value, aggregate) -> addProductToList(aggregate, value), (key, value, aggregate) -> removeProductFromList(aggregate, value) ); KStream<String, ShoppingCartWithProducts> shoppingCartWithProductsKStream = shoppingCartKStream.join( productListStream, (shoppingCart, productList) -> new ShoppingCartWithProducts(productList) );
当然它非常简化,我还需要处理墓碑等。
解决方案
在您定义您之后StreamsBuilder
,这是 Streams DSL 的入口点。
StreamsBuilder builder = new StreamsBuilder();
您可以使用JoinWindows.of(Duration.ofMinutes(5))
. 您必须使用相同类型的两个流的键,否则kafka-stream
无法比较不同类型的键。它就像一个数据库连接。所以,我正在使用String
forShoppingCart
和Product
。然后.join(...
运算符匹配相同键的事件,您可以构建新事件ShoppingCartWithProducts
。
KStream<String, ShoppingCart> shoppingCartKStream = ...;
KStream<String, Product> productKStream = ...;
shoppingCartKStream.join(productKStream,
(shop, prod) -> {
log.info("ShoppingCart: {} with Product: {}", shop, prod);
ShoppingCartWithProducts shoppingCartWithProducts = new ShoppingCartWithProducts();
shoppingCartWithProducts.setShoppingCart(shop);
shoppingCartWithProducts.setProduct(prod);
return shoppingCartWithProducts;
},
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(),
new JsonSerde<>(ShoppingCart.class),
new JsonSerde<>(Product.class)))
.foreach((k, v) -> log.info("ShoppingCartWithProducts ID: {}, value: {}", k, v));
你可以在这里找到更详细的信息。
推荐阅读
- python - Django - 通用 FormView - 将输入传递到模板中
- angular - Gridster高度可扩展
- node.js - 如何在我的代码中使用 telegraf-inline-menu 模块
- javascript - 在 react-native 中找不到变量 itemData
- mysql - 无法在 Azure AppService 中创建独立的 MySQL Docker 容器
- java - 在任何位置插入 Word 文本框 - Apache POI
- c++ - C++ 为什么在定义编译和链接之前引用外部实例的程序
- python - Python 中 xgb.train 和 xgb.XGBRegressor 之间的结果不匹配
- java - Java 8:Class.getName() 减慢字符串连接链
- c++ - 在 ECLIPSE 中用 C 语言打印出 HelloWorld