spring-kafka - 使用 @StreamListener 时,对 KafkaListenerContainerFactory 的自定义会反映在生成的 KafkaMessageListenerContainer 中吗?
问题描述
我正在使用带有 kafka binder 的 spring-cloud-stream 来使用来自 kafka 的消息。该应用程序基本上是使用来自 kafka 的消息并更新数据库。
在某些情况下,DB 出现故障(可能会持续数小时)或其他一些临时技术问题。由于在这些情况下,在有限的时间内重试消息然后将其移动到 DLQ 是没有意义的,所以当我们遇到某些类型的异常(例如 DBHostNotAvaialableException)时,我试图实现无限次数的重试
为了实现这一点,我尝试了两种方法(两种方法都面临问题) -
在这种方法中,尝试在配置 ConcurrentKafkaListenerContainerFactory bean 时在容器属性上设置错误处理程序,但错误处理程序根本没有被触发。在调试流程时,我在创建的 KafkaMessageListenerContainer 中实现了 errorHandler 字段,因此它们使用默认的 LoggingErrorHandler。下面是我的容器工厂 bean 配置 - 这种方法的 @StreamListener 方法与第二种方法相同,除了对消费者的搜索。
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(kafkaConsumerFactory); factory.getContainerProperties().setAckOnError(false); ContainerProperties containerProperties = factory.getContainerProperties(); // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
我在配置工厂 bean 时是否遗漏了一些东西,或者这个 bean 仅与 @KafkaListener 而不是 @StreamListener 相关?
第二种选择是尝试使用手动确认和查找来实现它,在 @StreamListener 方法中从标头获取确认和使用者,如果收到可重试的异常,我会使用 retrytemplate 进行一定次数的重试,当这些重试用尽时我会触发
consumer.seek()
. 下面的示例代码 -@StreamListener(MySink.INPUT) public void processInput(Message<String> msg) { MessageHeaders msgHeaders = msg.getHeaders(); Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class); Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class); String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class); Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class); try { retryTemplate.execute( context -> { // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException consumeMessage(msg.getPayload()); return null; } ); } catch (DBHostNotAvaialableException ex) { // once retries as per retrytemplate are exhausted do a seek consumer.seek(new TopicPartition(topicName, partition), offset); } catch (Exception ex) { // if some other exception just log and put in dlq based on enableDlq property logger.warn("some other business exception hence putting in dlq "); throw ex; } if (ack != null) { ack.acknowledge(); }
}
这种方法的问题- 因为我正在执行 consumer.seek() 而可能有来自上次轮询的待处理记录,如果 DB 在此期间出现(因此出现故障),这些记录可能会被处理和提交。有没有办法在执行搜索时清除这些记录?
PS - 我们目前处于 2.0.3.RELEASE 版本的 spring boot 和 Finchley.RELEASE 或 spring 云依赖项(因此也不能使用否定确认等功能,目前无法升级)。
解决方案
Spring Cloud Stream 不使用容器工厂。我已经在这个答案中向你解释了这一点。
2.1 版引入了ListenerContainerCustomizer
,如果您添加该类型的 bean,它将在容器创建后被调用。
Spring Boot 2.0 在一年多前就已终止,不再受支持。
我提到的答案显示了如何使用反射来添加错误处理程序。
只有在你有max.poll.records=1
.
推荐阅读
- javascript - Firebase 存储:this.xhr_.upload.addEventListener 不是函数
- node.js - 如何使用 Node.js 和 express 设置 Google Domains 动态 DNS?
- r - ClusterFuture 与雪块
- javascript - Puppeteer 无法从 Crontab 执行,但在从终端手动执行时工作
- java - 我可以对亚马逊 IOT 按钮进行编程以执行打开的应用程序的击键吗?
- c++ - 使用 lambda 函数修改类
- python - 从html中提取一些文本
- ruby-on-rails - 为什么在类的类名和文件名不匹配的情况下Ruby不会抛出错误?
- python - 为最小交换交换列表元素时出现 Python 超时错误 2
- c# - 在 Unity ReorderableList 上重新排序嵌套属性