java - 手动定义容器且未使用侦听器注释时如何在spring-kafka中暂停/恢复消费者
问题描述
我正在创建一个通用的 Kafka 模块,它处理所有与 Kafka 相关的配置代码,只需要作为依赖项添加到父应用程序中。然后应用程序只需要扩展一个类并覆盖其方法并只编写必要的业务逻辑。对于自定义错误处理程序和恢复,我没有使用任何注释。
public abstract class AbstractKafkaConsumerImpl<K, V> {
@Value("${kafka.fixed-back-off.interval}")
private long fixedBackOffInterval;
@Value("${kafka.fixed-back-off.max-attempts}")
private long fixedBackOffMaxAttempts;
@Autowired
private KafkaConsumerConfiguration defaultConsumerPropsConfiguration;
@Autowired
private KafkaProducerConfiguration kafkaProducerConfiguration;
private KafkaTemplate<K, V> dlqSender;
@PostConstruct
public void init() {
DefaultKafkaProducerFactory<K, V> producerFactory =
new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration.getProducer());
dlqSender = new KafkaTemplate<>(producerFactory);
getLogger().info("DLQ sender initialised for topic: {}", getTopicName());
}
@EventListener(ApplicationReadyEvent.class)
public void consume() {
final String consumerTopicName = getTopicName();
Map<String, Object> consumerProps =
new HashMap<>(defaultConsumerPropsConfiguration.getConsumer());
configureConsumerProperties(consumerProps);
DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
SeekToCurrentErrorHandler seekToCurrentErrorHandler;
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(dlqSender, (r, e) -> {
getLogger().error("Retries exhausted for consumer topic: {}. Sending to DLQ", r.topic());
return new TopicPartition(consumerTopicName + "_DLQ", r.partition());
});
seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(recoverer,
new FixedBackOff(fixedBackOffInterval, fixedBackOffMaxAttempts));
seekToCurrentErrorHandler.addNotRetryableException(NonRetryableException.class);
ContainerProperties containerProperties = new ContainerProperties(consumerTopicName);
AcknowledgingMessageListener<K, V> messageListener = new AcknowledgingMessageListener<K, V>() {
@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
processMessage(data);
acknowledge(data, acknowledgment);
}
};
containerProperties.setMessageListener(messageListener);
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
seekToCurrentErrorHandler.setCommitRecovered(true);
KafkaMessageListenerContainer<K, V> container =
new KafkaMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
container.setErrorHandler(seekToCurrentErrorHandler);
container.start();
}
private void acknowledge(ConsumerRecord<K, V> receivedRecord, Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
getLogger().debug("Topic: {}. Offset acknowledged: {}.", receivedRecord.topic(),
receivedRecord.offset());
}
protected void configureConsumerProperties(Map<String, Object> consumerProps) {
consumerProps.put(JsonDeserializer.KEY_DEFAULT_TYPE, getKeyClass());
consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, getValueClass());
}
protected abstract void processMessage(ConsumerRecord<K, V> receivedRecord);
protected abstract Logger getLogger();
protected abstract String getTopicName();
protected abstract Class<K> getKeyClass();
protected abstract Class<V> getValueClass();
}
由于每个扩展类的消费者都有不同的键和值类型,我必须为每个消费者创建不同的消费者工厂。
现在需要添加动态暂停/恢复任何消费者的功能。
我遇到了KafkaListenerEndpointRegistry
,但它只注册了 Kafka Listener 注释方法。
有没有办法可以将手动创建的容器注册到注册表并使用它的暂停/恢复调用?
另外,顺便说一句,上述为每个消费者创建工厂的做法是否可行?请记住,代码应该是通用的并且与所有 Key 和 Value 类型兼容。
解决方案
你不应该像那样创建工厂和容器;它们必须是 bean 才能正常工作,除非您接管 Springs 容器初始化工作(注入事件发布者等)的责任。
如果您必须手动创建它们,您应该使用它们将它们注册为 bean,GenericApplicationContext.registerBean()
然后 Spring 将初始化它们。
该框架确实不是为在 Spring 之外使用而设计的。也许就您要解决的问题提出另一个问题,有人可以提供比您提出的解决方案更好的解决方案。
container.start();
您需要保留对容器的引用以暂停和恢复它。
如果将其注册为 bean,则可以从 Application 上下文中按名称获取它。
推荐阅读
- python - python中的分区字符串并获取冒号前的所有值
- vue.js - 如何检查 Nuxt.js SSR 中的 api 状态?
- javascript - 第一次单击元素不起作用,第二次单击起作用
- reactjs - 如何在 React 中切换到旧版本?
- python - 热力循环,从 T2/T1 中找到一个比率
- python - 如何安装 PyQt5?
- javascript - 如何改进这个基本的秒表?
- javascript - 如何使用nodejs javascript创建带附件的多部分/表单数据请求
- flutter - 从导航器堆栈的中间移除
- spring - 如何在 Spring Boot 中处理对实体的部分更新