首页 > 解决方案 > KafkaConsumer Java API subscribe() 与 assign()

问题描述

我是 Kafka Java API 的新手,我正在处理来自特定 Kafka 主题的记录。

我知道我可以使用方法subscribe()从主题开始轮询记录。assign()如果我想从主题的选定分区开始轮询记录,Kafka 还提供了方法。

我想了解这是否是两者之间的唯一区别?

标签: javaapache-kafkakafka-consumer-api

解决方案


是的,因为组中的每个消费者都subscribegroup.id动态分配给订阅方法中提供的主题列表的分区,并且每个分区都可以由该组中的一个消费者线程使用。这是通过平衡消费者组中所有成员之间的分区来实现的,以便每个分区都分配给组中的一个消费者

assign将手动分配一个分区列表给这个消费者。并且此方法不使用消费者的组管理功能(不需要group.id

主要区别在于assign(Collection)会在动态分区分配和消费者组协调上松散控制器

消费者也可以使用 assign(Collection) 手动分配特定的分区(类似于旧的“简单”消费者)。在这种情况下,动态分区分配和消费者组协调将被禁用。

订阅

public void subscribe(java.util.Collection<java.lang.String> topics)

subscribe 方法订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为unsubscribe().

作为组管理的一部分,消费者将跟踪属于特定组的消费者列表,并在以下事件之一触发时触发重新平衡操作 -

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API

分配

public void assign(java.util.Collection<TopicPartition> partitions)

assign 方法手动将分区列表分配给此使用者。如果给定的主题分区列表为空,则将其视为与 unsubscribe() 相同。

通过这种方法手动分配主题不使用消费者的组管理功能。因此,当组成员或集群和主题元数据发生变化时,不会触发重新平衡操作。


推荐阅读