spring-boot - 如何在属于 AbstractConsumerSeekAware 类的 Spring Kafka 中实现 seekToEnd()?
问题描述
我有一个要求。我们有一个从 kafka 主题读取的 spring boot kafka 消费者应用程序。我们的要求是每当应用程序出现故障并出现时,我都想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用 AbstractConsumerSeekAware 使用 seekToEnd() 将偏移设置到末尾,如下面的代码所示。
public class KafkaConsumer extends AbstractConsumerSeekAware {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "${topic.consumer}")
public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
//consuming the message from consumer Record.
seekToEnd();
doSomething();
ack.acknowledge();
}
但是,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量开始读取,但我们只想从应用程序启动时的偏移量读取。我们怎样才能做到这一点?
解决方案
在方法中这样做是不正确的@KafkaListener
。只有当消费者从分区传递记录时才会真正调用这个。
出于这个原因,您必须实现一个onPartitionsAssigned()
,因此消费者在开始轮询它们之前会寻找这些分区。
在文档中查看更多信息:https ://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
推荐阅读
- java - elasticsearch 6 ESIntegTestCase“代码库属性已设置”
- random - 使用 Wolfram Mathematica 获得分布的示例模拟
- r - R导入多个csv文件
- python - 如何在列表中对具有相同键的字典值求和?
- ios - layoutSubviews 添加到 UICollectionView 时冻结应用程序
- mysql - Mysql生成查询以生成自定义结果
- c# - 即使 MessageVersion 是 Soap11WSAddressing10,WCF 仍在使用 SOAP 1.2
- c# - 如何将格式化的文本从 Windows 窗体传递到 Word 应用程序?
- fiware - 如何使用 Wirecloud sth-source 算子
- python-3.x - 在 python 中使用 pymssql 从 SQL Server 检索数据正在用 False 替换 0,用 None 替换 NULL