java - KafkaConsumer Java API subscribe() 与 assign()
问题描述
我是 Kafka Java API 的新手,我正在处理来自特定 Kafka 主题的记录。
我知道我可以使用方法subscribe()
从主题开始轮询记录。assign()
如果我想从主题的选定分区开始轮询记录,Kafka 还提供了方法。
我想了解这是否是两者之间的唯一区别?
解决方案
是的,因为组中的每个消费者都subscribe
将group.id
动态分配给订阅方法中提供的主题列表的分区,并且每个分区都可以由该组中的一个消费者线程使用。这是通过平衡消费者组中所有成员之间的分区来实现的,以便每个分区都分配给组中的一个消费者
assign
将手动分配一个分区列表给这个消费者。并且此方法不使用消费者的组管理功能(不需要group.id
)
主要区别在于assign(Collection)
会在动态分区分配和消费者组协调上松散控制器
消费者也可以使用 assign(Collection) 手动分配特定的分区(类似于旧的“简单”消费者)。在这种情况下,动态分区分配和消费者组协调将被禁用。
订阅
public void subscribe(java.util.Collection<java.lang.String> topics)
subscribe 方法订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为unsubscribe().
作为组管理的一部分,消费者将跟踪属于特定组的消费者列表,并在以下事件之一触发时触发重新平衡操作 -
Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API
分配
public void assign(java.util.Collection<TopicPartition> partitions)
assign 方法手动将分区列表分配给此使用者。如果给定的主题分区列表为空,则将其视为与 unsubscribe() 相同。
通过这种方法手动分配主题不使用消费者的组管理功能。因此,当组成员或集群和主题元数据发生变化时,不会触发重新平衡操作。
推荐阅读
- c++ - 如何理解我的 clion 性能分析报告?
- c++ - Convert Linux Debian gcc command to CMakeLists.txt
- firebase - Firebase 消息传递的插件和版本冲突
- vb.net - 添加到表单时,用户控件空格
- python - Dropbox OAuth2 Flow 和 CSRF 保护:webhook 是为哪个用户提供的?
- java - 当它改变位置时,是否有任何方法可以跟踪列表视图中的项目?
- python - 将函数调用注入python函数
- python - 如何将来自两个不同字典的键组合成一个列表,该列表是第三个字典的值
- r - 使用 R 和 gsub 有没有办法删除两个特殊字符并用一个替换
- flutter - 如何使用 BLoC 以编程方式关闭 CupertinoActionSheet?