apache-kafka - 如何从一种输入类型写入多种输出类型?
问题描述
我是卡夫卡流的新手。
我能够实现以下目标。我有一个输入主题。消息对象(比如 Flower)。我有一个将花转换为水果的变压器。然后我将 Fruit 存储到输出主题。
我想要实现的就像下面的东西
- 我将水果存储到水果主题(...如上)
- 我将(成功的)Flower 存储到 Success_Flower 主题(...另外)
(如果不能像上面那样区分花,如果我可以将传入的花(成功或失败......两者)存储到 OutputFlower 主题,那就没问题了)
这可能吗?如果是,如何?如果我需要提供更多详细信息,请告诉我。谢谢
解决方案
为此,您可以使用以下方法branch
:
KStream<String, Either<Flower, Fruit>>[] flowerAndFruitStreams = inputMessages
.flatMapValues(flower -> List.<Either<Flower, Fruit>>of(Either.left(flower), Either.right(fromFlowerToFruit(flower))))
.branch((key, value) -> value.isLeft(), (key, value) -> value.isRight());
KStream<String, Either<Flower, Fruit>> flowerStream = flowerAndFruitStreams[0];
KStream<String, Either<Flower, Fruit>> fruitsStream = flowerAndFruitStreams[1];
解释
如您所见,分支方法返回 a KStream<K, V>[]
,这意味着根据应用于此方法的谓词将输入拆分为许多流。因此,在将花转换为水果后,您应该返回一个对象,该对象要么包含花,要么包含水果,以便拥有相同对象类型的流。为此,我们将借鉴函数式编程的概念Either
,您可以在下面找到它:
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class Either<L, R> {
private final Optional<L> left;
private final Optional<R> right;
private Either(Optional<L> l, Optional<R> r) {
left = l;
right = r;
}
public static <L, R> Either<L, R> left(L value) {
return new Either<>(Optional.of(value), Optional.empty());
}
public static <L, R> Either<L, R> right(R value) {
return new Either<>(Optional.empty(), Optional.of(value));
}
public boolean isLeft() {
return left.isPresent();
}
public boolean isRight() {
return right.isPresent();
}
public L getLeft() {
return left.get();
}
public R getRight() {
return right.get();
}
public <T> T map(Function<? super L, ? extends T> leftFunc, Function<? super R, ? extends T> rightFunc) {
return left.<T>map(leftFunc).orElseGet(() -> right.map(rightFunc).get());
}
public <T> Either<T, R> mapLeft(Function<? super L, ? extends T> leftFunc) {
return new Either<>(left.map(leftFunc), right);
}
public <T> Either<L, T> mapRight(Function<? super R, ? extends T> rightFunc) {
return new Either<>(left, right.map(rightFunc));
}
public void apply(Consumer<? super L> leftFunc, Consumer<? super R> rightFunc) {
left.ifPresent(leftFunc);
right.ifPresent(rightFunc);
}
}
我没有编译代码,所以请仔细检查语法。此代码基于 Java 13 和 KafkaStreams 2.5.0,但也适用于最新版本。
推荐阅读
- typescript - 输入“字符串”| null 不可分配给类型“字符串”
- python - OSError:保存tensorflow yolov4模型时无法创建文件
- android - 视频未显示在 android Gallery 中
- python - 在列出总共两列的每第二行之后插入索引?
- reactjs - 具有 useEffect 的反应组件的单元测试
- javascript - 如何在输入中添加模式和标题
- woocommerce - 创建简码以仅显示给定父名称 Woocommerce 的子类别
- r - npregbw 上的 slurm_apply 函数
- unity3d - 如何使光线投射无法检测到统一 3d 中的某些对撞机?
- python - 使用正则表达式对字符串进行分组