首页 > 解决方案 > Spring Boot REST服务Kafka主题无法使用指定数量的消息

问题描述

我有一个简单的 Spring 启动服务,它按需调用并使用来自主题的指定数量的消息。要使用的消息数作为参数传递。每 30 分钟呼叫一次服务。每条消息大小约为 1.6 kb。当我调用服务并传递 3000 的参数时,我期望返回 300 万条消息,但我每次总是收到大约 1100 或 1200 条消息。我只有一个分区的一个主题。这是一项按需服务,因此不使用 while 循环并将轮询时间设置为 30 秒。但是响应会在 10 秒内返回,即使 MAX_POLL_RECORDS_CONFIG 为 3000、4000 或 5000,返回的记录数也约为 1200。只是好奇即使轮询时间为 30 秒,消息是否有任何大小限制,我怎样才能实现更多吞吐量或接近极限。每次调用服务时都会执行以下操作。这是服务的调用方式http://example.com/messages?limit=5000

Properties p = new Properties();
//limit is the value coming in as a query paramter and can be 3000, 4000 or 5000
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG , 15000);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 22 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 50 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG , 500);
p.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG , 50 * 1024 * 1024);

consumer = consumerFactory.createConsumer("my-group-id", null, null, p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));


ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(30));

// processing the messages somehow I always get ~1200 messages 
.........................................
.........................................
.................................
consumer.commitAsync();

// return list of messages

谢谢

标签: spring-bootrestapache-kafkakafka-consumer-apispring-kafka

解决方案


推荐阅读