首页 > 解决方案 > 如何在 KStream (Kafka Streams) 中加入列表的每个元素

问题描述

例如,我有一个 KStream 的购物车,每个购物车都有一个产品 ID 列表。此外,还有一个带有产品的 KStream。我如何将它们连接在一起?

public class ShoppingCart {
    List<ProductKey> productKeys;
}

public class Product {
    ProductKey key;
    String name;
}

public class ProductKey {
    String id;
}

KStream<String, ShoppingCart> shoppingCartKStream;
KStream<ProductKey, Product> productKStream;

我想要的结果看起来像这样

KStream<String, ShoppingCartWithProducts> joinedStream;
public class ShoppingCartWithProducts {
    List<Product> products;
}

有没有简单的方法来存档?

编辑:我知道有一种方法,但我觉得它太复杂了。简而言之:

  1. 我需要对 shoppingCart-KStream 进行平面映射

  2. 然后我可以将结果与产品-KStream 结合起来

  3. 对中间结果进行分组和聚合

  4. 最后加入 shoppingCart-KStream

    KStream<String, ProductKey> productKeyStream = shoppingCartKStream
            .flatMap((key, shoppingCart) -> shoppingCart.productKeys.stream()
                    .map(productKey -> KeyValue.pair(key, productKey))
                    .collect(Collectors.toList())
            );
    
    KTable<String, Product> productStreamWithShoppingCartKey = productKeyStream.toTable()
            .join(
                    productKStream.toTable(),
                    productKey -> productKey,
                    (productKey, product) -> product
            );
    
    KTable<String, ArrayList<Product>> productListStream = productStreamWithShoppingCartKey
            .groupBy(KeyValue::pair)
            .aggregate(
                    (Initializer<ArrayList<Product>>) ArrayList::new,
                    (key, value, aggregate) -> addProductToList(aggregate, value),
                    (key, value, aggregate) -> removeProductFromList(aggregate, value)
            );
    
    KStream<String, ShoppingCartWithProducts> shoppingCartWithProductsKStream = shoppingCartKStream.join(
            productListStream,
            (shoppingCart, productList) -> new ShoppingCartWithProducts(productList)
    );
    

当然它非常简化,我还需要处理墓碑等。

标签: javaapache-kafkaapache-kafka-streams

解决方案


在您定义您之后StreamsBuilder,这是 Streams DSL 的入口点。

StreamsBuilder builder = new StreamsBuilder();

您可以使用JoinWindows.of(Duration.ofMinutes(5)). 您必须使用相同类型的两个流的键,否则kafka-stream无法比较不同类型的键。它就像一个数据库连接。所以,我正在使用StringforShoppingCartProduct。然后.join(...运算符匹配相同键的事件,您可以构建新事件ShoppingCartWithProducts

KStream<String, ShoppingCart> shoppingCartKStream = ...;
KStream<String, Product> productKStream = ...;

shoppingCartKStream.join(productKStream,
    (shop, prod) -> {
        log.info("ShoppingCart: {} with Product: {}", shop, prod);

        ShoppingCartWithProducts shoppingCartWithProducts = new ShoppingCartWithProducts();
        shoppingCartWithProducts.setShoppingCart(shop);
        shoppingCartWithProducts.setProduct(prod);
        return shoppingCartWithProducts;
    },
    JoinWindows.of(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(),
        new JsonSerde<>(ShoppingCart.class),
        new JsonSerde<>(Product.class)))
   .foreach((k, v) -> log.info("ShoppingCartWithProducts ID: {}, value: {}", k, v));

你可以在这里找到更详细的信息。


推荐阅读