java - Kafka 消费者分配返回空集
问题描述
我已经订阅了一个 Kafka 主题,如下所示。只有在为消费者分配了分区后,我才需要运行一些逻辑。
然而consumer.assignment()
,无论我等多久,它都会以空集的形式返回。如果我没有 while 循环然后做一个 consumer.poll()
我确实从主题中获取记录。有人能告诉我为什么会这样吗?
consumer.subscribe(topics);
Set<TopicPartition> assigned=Collections.emptySet();
while(isAssigned) {
assigned = consumer.assignment();
if(!assigned.isEmpty()) {
isAssigned= false;
}
}
//consumer props
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");
解决方案
在你打电话之前poll()
,消费者只是在闲置。
只有在poll()
被调用后,它才会启动与集群的连接,获取分配的分区并尝试获取消息。
Javadoc中提到了这一点:
订阅一组主题后,消费者会在调用 poll(Duration) 时自动加入群组。
一旦您开始调用poll()
,您可以使用assignment()
甚至注册 aConsumerRebalanceListener
以在从您的消费者实例分配或撤销分区时收到通知。
推荐阅读
- wpf - 在 XamDataGrid 中实现条件格式,如 excel
- angular - 角度的引导数组
- r - 如何修复 R 中印地语字符的条形图显示?
- typescript - TypeScript 通用函数 - 为数组应用函数时出现“参数不可分配错误”
- django-rest-framework - 我需要有关如何为每个用户创建一个表作为用户的联系人列表的帮助
- javascript - 打字稿在具有不同类型的数组中查找
- asp.net - ASP Web api 在本地机器上工作但不能在生产环境中工作
- css - Angular 5 - Material Snackbar 无法正常工作
- mysql - 当字段值与数字相乘时,MySQL选择错误的结果
- .net - .Net CAD 2014,自定义功能区菜单无法显示