首页 > 解决方案 > 嵌入式 Kafka 未显示消费者偏移量

问题描述

有一个嵌入式 kafka 实例作为测试的一部分运行。我正在尝试验证是否已阅读所有消息,但从 kafka 管理客户端得到一个空结果。

Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
        try{
            return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    });

地图总是空的。我已经尝试设置 ack all 并设置 100ms autoOffsetCommit 等待看看这是否有任何区别,但没有运气。

 System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
            .getBrokersAsString());
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
 System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
 System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true");       System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");

标签: kafka-consumer-apispring-kafkaspring-cloud-streamembedded-kafka

解决方案


你什么时候设置这些系统属性?您确定绑定使用的是嵌入式代理吗?

这对我来说很好。

@SpringBootApplication
public class So65329718Application {

    public static void main(String[] args) {
        SpringApplication.run(So65329718Application.class, args);
    }

    @Bean
    Consumer<String> consume() {
        return System.out::println;
    }

}
spring.cloud.stream.bindings.consume-in-0.group=theGroup
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So65329718ApplicationTests {

    @Autowired
    KafkaTemplate<byte[], byte[]> template;

    @Autowired
    EmbeddedKafkaBroker broker;

    @Test
    void test() throws InterruptedException {
        this.template.send("consume-in-0", "foo".getBytes());
        Thread.sleep(10_000);
        this.broker.doWithAdmin(admin -> {
            try {
                System.out.println(admin.listConsumerGroupOffsets("theGroup").partitionsToOffsetAndMetadata()
                        .get(10, TimeUnit.SECONDS));
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
        });
    }

}
foo
...
{consume-in-0-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}

spring.kafka.bootstrap-servers如果没有特定于粘合剂的属性,则使用粘合剂)。


推荐阅读