java - 当所有分区都暂停时如何停止容器?
问题描述
我有一个用例,我正在创建基于 API 调用的动态侦听器。收到某条消息后,我正在单独暂停分区。例如,如果我收到一条偏移量为 100 的消息,那么该分区将被暂停。这将对所有分区进行。暂停所有分区后,我想停止容器。
这是我的代码 -
public class OffsetBasedMessageListener implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
if (consumerRecord.offset() == 100) {
consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(),
consumerRecord.partition())));
}
}
现在,我配置了一个 EventListener,它将在容器空闲时触发。
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
Collection<TopicPartition> collection = event.getContainer(ConcurrentMessageListenerContainer.class).getAssignedPartitions();
for (TopicPartition topicPartition: collection) {
System.out.println(
event.getContainer(ConcurrentMessageListenerContainer.class).isPartitionPaused(topicPartition));
}
}
所以,我现在正在做的是检查是否所有分区都已暂停,然后我将停止容器。但isPartitionPaused
总是返回 false,即使它应该返回 true。
我正在将 SpringBoot 与 SpringKafka 一起使用。
有人可以告诉我我做错了什么吗?还是有另一种方法来实现这一目标?
谢谢
解决方案
你不应该直接暂停消费者——容器不知道你已经暂停了它;因此它返回false。
而是调用pausePartition
容器。
请注意,无论哪种方式,在处理完之前轮询的所有记录之前,消费者都不会真正暂停。设置max.poll.records
为 1 以立即暂停。
编辑
这对我来说很好......
@SpringBootApplication
@RestController
public class So67430862Application {
private static final Logger LOG = LoggerFactory.getLogger(So67430862Application.class);
public static void main(String[] args) {
SpringApplication.run(So67430862Application.class, args);
}
Set<ConcurrentMessageListenerContainer<String, String>> containers = ConcurrentHashMap.newKeySet();
int id;
@Autowired
Creator creator;
@PostMapping(path = "/send/{topic}")
public void sendFoo(@PathVariable String topic) {
ConcurrentMessageListenerContainer<String, String> container = this.creator.create(topic);
container.getContainerProperties().setGroupId("group" + ++id);
container.getContainerProperties().setMessageListener(new Listener(container));
this.containers.add(container);
container.start();
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
ConcurrentMessageListenerContainer<String, String> container =
event.getContainer(ConcurrentMessageListenerContainer.class);
if (this.containers.contains(container)) {
boolean allPaused = container.getAssignedPartitions()
.stream()
.map(part -> container.isPartitionPaused(part))
.allMatch(paused -> paused);
LOG.info("All paused? {}", allPaused);
if (allPaused) {
container.stop(() -> { });
this.containers.remove(container);
}
}
}
// @Bean
// public NewTopic topic() {
// return TopicBuilder.name("so67430862").partitions(5).replicas(1).build();
// }
//
//
// @Bean
// public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// return args -> {
// IntStream.range(0, 5).forEach(p -> {
// IntStream.range(0, 10).forEach(i ->template.send("so67430862", p, null, "test"));
// });
// };
// }
}
@Component
class Creator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
Creator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
ConcurrentMessageListenerContainer<String, String> create(String topic) {
return this.factory.createContainer(topic);
}
}
class Listener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
final ConcurrentMessageListenerContainer<String, String> container;
Listener(ConcurrentMessageListenerContainer<String, String> container) {
this.container = container;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
LOG.info(ListenerUtils.recordToString(data, true));
if (data.offset() == 5) {
LOG.info("Pausing partition {}", data.partition());
this.container.pausePartition(new TopicPartition(data.topic(), data.partition()));
}
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
spring.kafka.consumer.max-poll-records=1
spring.kafka.listener.idle-event-interval=5000
2021-05-10 11:51:15.792 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@0
2021-05-10 11:51:15.797 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@1
2021-05-10 11:51:15.799 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@2
2021-05-10 11:51:15.801 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@3
2021-05-10 11:51:15.803 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@4
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@5
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 0
2021-05-10 11:51:15.814 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@0
2021-05-10 11:51:15.816 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@1
2021-05-10 11:51:15.819 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@2
2021-05-10 11:51:15.823 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@3
2021-05-10 11:51:15.828 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@4
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@5
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 1
2021-05-10 11:51:15.835 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@0
2021-05-10 11:51:15.838 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@1
2021-05-10 11:51:15.841 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@2
2021-05-10 11:51:15.844 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@3
2021-05-10 11:51:15.846 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@4
2021-05-10 11:51:15.849 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@5
2021-05-10 11:51:15.850 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 4
2021-05-10 11:51:15.854 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@0
2021-05-10 11:51:15.858 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@1
2021-05-10 11:51:15.861 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@2
2021-05-10 11:51:15.864 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@3
2021-05-10 11:51:15.866 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@4
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@5
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 2
2021-05-10 11:51:15.875 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@0
2021-05-10 11:51:15.877 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@1
2021-05-10 11:51:15.880 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@2
2021-05-10 11:51:15.883 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@3
2021-05-10 11:51:15.886 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@4
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@5
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 3
2021-05-10 11:51:20.893 INFO 58682 --- [ consumer-0-C-1] com.example.demo.So67430862Application : All paused? true
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group1: partitions revoked: [so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3]
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-be137c2c-2bbf-4235-a143-15d33a0cfe52 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group1-1, groupId=group1] Unsubscribed all topics or patterns and assigned partitions
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-05-10 11:51:20.898 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-05-10 11:51:20.901 INFO 58682 --- [ consumer-0-C-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-group1-1 unregistered
2021-05-10 11:51:20.902 INFO 58682 --- [ consumer-0-C-1] essageListenerContainer$ListenerConsumer : group1: Consumer stopped
注释掉的代码创建了一个包含 5 个分区和每个分区 10 条记录的主题;监听器在偏移量 5 处暂停每个分区。如您所见,我们没有得到其余的记录。
推荐阅读
- php - PHP插入,看不到值
- progress - 有人可以帮我理解彼得森的解决方案如何满足无饥饿、进步和互斥条件吗?
- ios - 为什么使用 UIBezierPath 无法正确绘制“饼片”中的斜线?
- javascript - 我需要在我的数据库函数响应中返回一个唯一的用户 ID,我该怎么做
- javascript - 在 Typescript 接收函数中使用 Array.from 未定义错误
- maven - 任何人都可以找出为什么我仍然收到此错误,“创建名称为 'dataSource' 的 bean 时出错”
- python - 如何让 VS Code 运行“python”而不是“&C:/Users/[username]/AppData/Local/Programs/Python/Python38/python.exe”?
- javascript - 在 history.listen 的 useEffect 中使用 useState 钩子
- jquery - 向左或向右滑动到元素
- shell - 复制许多文件并对内容进行一些修改