首页 > 解决方案 > 如何从一种输入类型写入多种输出类型?

问题描述

我是卡夫卡流的新手。

我能够实现以下目标。我有一个输入主题。消息对象(比如 Flower)。我有一个将花转换为水果的变压器。然后我将 Fruit 存储到输出主题。

我想要实现的就像下面的东西

  1. 我将水果存储到水果主题(...如上)
  2. 我将(成功的)Flower 存储到 Success_Flower 主题(...另外)

(如果不能像上面那样区分花,如果我可以将传入的花(成功或失败......两者)存储到 OutputFlower 主题,那就没问题了)

这可能吗?如果是,如何?如果我需要提供更多详细信息,请告诉我。谢谢

标签: apache-kafkaapache-kafka-streams

解决方案


为此,您可以使用以下方法branch

    KStream<String, Either<Flower, Fruit>>[] flowerAndFruitStreams = inputMessages
            .flatMapValues(flower -> List.<Either<Flower, Fruit>>of(Either.left(flower), Either.right(fromFlowerToFruit(flower)))) 
            .branch((key, value) -> value.isLeft(), (key, value) -> value.isRight());

    KStream<String, Either<Flower, Fruit>> flowerStream = flowerAndFruitStreams[0];
    KStream<String, Either<Flower, Fruit>> fruitsStream = flowerAndFruitStreams[1];

解释

如您所见,分支方法返回 a KStream<K, V>[],这意味着根据应用于此方法的谓词将输入拆分为许多流。因此,在将花转换为水果后,您应该返回一个对象,该对象要么包含花,要么包含水果,以便拥有相同对象类型的流。为此,我们将借鉴函数式编程的概念Either,您可以在下面找到它:

import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

public class Either<L, R> {

    private final Optional<L> left;
    private final Optional<R> right;

    private Either(Optional<L> l, Optional<R> r) {
        left = l;
        right = r;
    }

    public static <L, R> Either<L, R> left(L value) {
        return new Either<>(Optional.of(value), Optional.empty());
    }

    public static <L, R> Either<L, R> right(R value) {
        return new Either<>(Optional.empty(), Optional.of(value));
    }

    public boolean isLeft() {
        return left.isPresent();
    }

    public boolean isRight() {
        return right.isPresent();
    }

    public L getLeft() {
        return left.get();
    }

    public R getRight() {
        return right.get();
    }

    public <T> T map(Function<? super L, ? extends T> leftFunc, Function<? super R, ? extends T> rightFunc) {
        return left.<T>map(leftFunc).orElseGet(() -> right.map(rightFunc).get());
    }

    public <T> Either<T, R> mapLeft(Function<? super L, ? extends T> leftFunc) {
        return new Either<>(left.map(leftFunc), right);
    }

    public <T> Either<L, T> mapRight(Function<? super R, ? extends T> rightFunc) {
        return new Either<>(left, right.map(rightFunc));
    }

    public void apply(Consumer<? super L> leftFunc, Consumer<? super R> rightFunc) {
        left.ifPresent(leftFunc);
        right.ifPresent(rightFunc);
    }
}

我没有编译代码,所以请仔细检查语法。此代码基于 Java 13 和 KafkaStreams 2.5.0,但也适用于最新版本。


推荐阅读