首页 > 解决方案 > 如何从自定义 PartitionAssignor 实现中获取机架 ID 或消费者信息

问题描述

我需要实现一个基于机架感知的自定义 Kafka PartitionAssignor。我的自定义分配器将覆盖订阅方法,以便能够读取消费者的实例信息:

    /**
     * Return a serializable object representing the local member's subscription. This can include
     * additional information as well (e.g. local host/rack information) which can be leveraged in
     * {@link #assign(Cluster, Map)}.
     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
     *               and variants
     * @return Non-null subscription with optional user data
     */
    Subscription subscription(Set<String> topics);

该方法的想法是读取当前消费者的实例信息,例如 rack.id 以发布到 ConsumerCoordinator,后者会将它们转发给 assign() 方法。不幸的是,我还没有找到获取消费者信息的机制,因为在运行时没有对它的引用。问题是:如何从我的自定义 Kafka PartitionAssignor 实现中获取消费者的实例信息?

谢谢。

标签: apache-kafka

解决方案


订阅类有一个字节缓冲区,您可以将任意数据放入:

class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData; <---- HERE
}

您将此数据设置为PartitionAssignor每个消费者必须运行的数据。然后这个订阅(包括数据)被交给PartitionAssignor一些选定的消费者来计算新的分配:

Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);

推荐阅读