java - 缩放模式匹配 Kafka 消费者
问题描述
我有一个场景,其中有多个 Kafka 主题(每个主题有一个分区)和一个消费者组来消费记录。我在消费者组中使用单个模式匹配消费者,它匹配所有主题,因此消费所有主题中的所有记录。
我现在想扩大规模,让多个消费者(在同一个消费者组中)收听所有主题。但是,这似乎不起作用,因为所有记录仅由组中的第一个消费者使用,从而使组中的其他消费者无用。此外,我使用ExecutorService
.
我怎样才能做到这一点?下面是我的代码:
Pattern pattern = Pattern.compile(topicPattern); consumer.subscribe(pattern);
上面代码中发送的模式与所有主题的名称匹配,例如。
如果主题名称是sample_topic_1
等sample_topic_2
,我们将其与 匹配sample_topic_*$
。
解决方案
您描述的方法应该适用于您发布的代码。但是,这可能是没有足够数据供多个消费者使用的情况。或者,数据可能以“突发”的形式出现,小到足以容纳一个批次。
尽管 Kafka 中的负载理论上分布在同一消费者组的所有消费者中,但在实践中,如果只有一个“批次”的数据,那么第一个消费者可以获取所有数据,其他任何人都将一无所有。这意味着:
- 您没有发送足够的数据将其分发给所有消费者(尝试发送更多数据来验证这一点),或者
- 你有一个奇怪的配置,你配置的批次是巨大的,和/或
linger.ms
属性配置得非常高,或者 - 以上两者的结合。
我建议先尝试发送更多数据,看看是否能解决问题。如果没有,请尝试缩减到仅 1 个消费者,验证它是否仍在工作。然后再向该消费者组添加一个消费者,并查看行为是否发生变化。
推荐阅读
- python-3.x - 有没有办法使用 ElementTree 注册多个命名空间
- sql - Oracle 18c - Complex sql
- flutter - 如何在颤振中修复新创建的项目中的 !_debuglocked 错误?
- javascript - 在javascript中的同一类中的其他方法中获取数组
- regex - 为什么 Perl 的“不匹配”运算符在这里不起作用?
- machine-learning - 为什么我的“val_accuracy”从高值开始?
- java - JAVA APACHE POI:出现“我们发现 *.xlsx 中的某些内容存在问题。您是否希望我们尽可能多地尝试恢复它”错误
- typescript - TypeScript 在重新导出库时报告没有导出的成员
- android - ADB声称我已经连接到设备IP地址作为另一台设备的IP地址
- r - 将日期向量转换为从第一个日期经过的天数向量