apache-kafka - 并行从单个流主题写入不同的主题
问题描述
我有一个流,它将消息映射到两个不同的 map() 调用,并进一步被过滤并写入两个不同的主题。
KStream<String, byte[]>[] stream = builder.<String, byte[]>stream("source-topic");
stream.map(logic1OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1", Produced.with(Serdes.String(), Serdes.String())
stream.map(logic2OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2", Produced.with(Serdes.String(), Serdes.String())
有没有办法可以并行运行 stream.map(logc1OnData)... 和 stream.map(logic2OnData) ?看起来他们一个接一个地运行,即第一个映射被执行并写入topic1,然后第二个映射被执行并写入topic2 FYI ..我不想要num.threads.count,因为我的流输入来自单个主题和我正在运行同一应用程序的多个实例以从源主题主题中读取,以在使用时实现并行性。
我正在寻找的是在执行和写入不同主题时的并行性
解决方案
您正在查看的是您的操作添加到拓扑中的顺序。一旦拓扑被执行,记录器将按照它们到达的顺序流经拓扑,但在运行之前logic2OnData
不会等待logic1OnData
完成处理。
如果您担心性能,您可以考虑stream threads
获得更多并行性。
编辑:看来我可能错过了这个问题。
单个子拓扑不允许您以并行方式运行每个分支。但是,您可以使用repartition()
将 logic2OnData 制作成它自己的子拓扑,并且repartition()
调用之后的所有内容都将能够与之前的所有内容并行运行。
推荐阅读
- javascript - 将内存地址位置作为html的href中的链接
- vba - 在 Excel 的两个工作簿中查找匹配数据,并在大型数据集上格式化匹配结果
- python - 哪一个是高效的,使用 sql 连接查询,或使用 pandas 合并查询?
- azure - 暂停 Azure 存储同步服务
- php - 在 laravel 中对字符串调用成员函数 format()
- assembly - 使用 msdos 文件编码时的汇编语言问题
- r - R - 替换文本中的单词/短语
- spring-boot - Kubernetes Secret TLS 证书 P12 和 Spring Boot 部署不起作用
- php - 脚本中的 PHP MySQL 语法错误,但直接输入 MySQL 时语句有效
- java - 即使遇到问题,也要在跨区域内提交