spring-boot - Spring Boot REST服务Kafka主题无法使用指定数量的消息
问题描述
我有一个简单的 Spring 启动服务,它按需调用并使用来自主题的指定数量的消息。要使用的消息数作为参数传递。每 30 分钟呼叫一次服务。每条消息大小约为 1.6 kb。当我调用服务并传递 3000 的参数时,我期望返回 300 万条消息,但我每次总是收到大约 1100 或 1200 条消息。我只有一个分区的一个主题。这是一项按需服务,因此不使用 while 循环并将轮询时间设置为 30 秒。但是响应会在 10 秒内返回,即使 MAX_POLL_RECORDS_CONFIG 为 3000、4000 或 5000,返回的记录数也约为 1200。只是好奇即使轮询时间为 30 秒,消息是否有任何大小限制,我怎样才能实现更多吞吐量或接近极限。每次调用服务时都会执行以下操作。这是服务的调用方式http://example.com/messages?limit=5000
Properties p = new Properties();
//limit is the value coming in as a query paramter and can be 3000, 4000 or 5000
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG , 15000);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 22 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 50 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG , 500);
p.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG , 50 * 1024 * 1024);
consumer = consumerFactory.createConsumer("my-group-id", null, null, p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(30));
// processing the messages somehow I always get ~1200 messages
.........................................
.........................................
.................................
consumer.commitAsync();
// return list of messages
谢谢
解决方案
推荐阅读
- selenium - 自动提示文本自动 travelocity.com 上的 xpath 失败
- excel - 来自 2 列或更多列的 VBA 唯一值
- python - Python unittest - 在 0.000 秒内额外运行 0 次测试
- javascript - 使用 javascript 滚动侧边栏
- python - 使用 pywinauto 访问 Putty 输出流
- scala - 如何在 scalaj-http 中为 multipart/form-data 请求添加文件作为参数?
- angular - 如何将数据表放在中间而不占整个宽度
- r - 估算/填补时间段之间的缺失值
- ios - 如何有条件地显示远程通知?
- python - 这行中的“<”在做什么?:data += dt < b