apache-kafka - 如何组合来自 Kafka 主题的输出
问题描述
以下是我的用例场景,其中一个应用程序将数据推送到三个不同的 kafka 主题(有唯一的应用程序 ID),输出将进入后续队列 4 和队列 5。我已经实现了如下所示的管道。
我面临的唯一问题是如何组合主题 5 中特定 app_id 的所有输出。应用程序推送多个请求,每个请求在此管道中都有唯一的 id。因此对特定 app_id 的所有请求可能不是按顺序进行的。队列 5 中可能还有其他 app_id 数据。
在为主题 5 创建消费者时,我应该为每个 app_id 使用不同的 group_id 吗?
如果您有任何想法,请帮助我。我正在使用 kafka-python。
from kafka import KafkaConsumer, KafkaProducer
KAFKA = dict()
KAFKA['producer'] = KafkaProducer(bootstrap_servers=[server]))
for queue in ['queue 1', 'queue 2', 'queue 3', 'queue 4', 'queue 5']:
KAFKA['queue'] = KafkaConsumer(queue,
bootstrap_servers=[server],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='group'+queue)
解决方案
如果您只想一次阅读三个主题,那么您会这样做KafkaConsumer('1,2,3')
faust
如果目标是拥有多个这样的主题链,我也会建议
推荐阅读
- python - 我在数据库中的日期戳没有改变。蟒蛇/玛丽亚数据库
- php - Laravel 尾部斜杠重定向到带有公共的 url。如何解决?
- sql - 我无法更新到日期时间表
- mysql - 查询两个表输出 (MYSQL)
- docker - docker compose up 显示服务器正在运行,但浏览器显示站点无法访问
- mysql - MySQL:从具有复杂日期比较运算符的内联子查询中返回多列
- c++ - 如何在中使用用户定义的浮点类型
? - c# - 序列化/反序列化 XML 时移除根
- python - 在 pandas 的 x 轴上制作带有字符串的散点图
- java - javafx.fxml.LoadException 和 JavaFX SceneBuilder