spring - pubsub 中订阅者的重试设置是什么以及如何在 spring 应用程序中正确设置它们?
问题描述
我有一个春季服务订阅来自谷歌云发布订阅(拉)中的主题的消息。它通常工作正常。但我想对重新发送的消息有更多的控制权。我的服务有时需要对消息进行回复,或者只是让 ackDeadline 通过,以便我稍后再次收到消息。在使用单个消息进行测试时,nacked 消息几乎立即返回给我,而我根本不 ack 或 nack 的消息,在 ackDeadline 默认 10 秒后返回。我希望它可以推迟重复使用这些消息。我认为重试设置是为这种情况设计的。我还应该提到,我目前正在使用模拟器在本地进行测试并从代码创建订阅。我正在使用 PubSubAdmin 进行管理。
根据这个文档,我试图在我的配置文件配置中设置这些配置。像这样:
spring.cloud.gcp.pubsub.subscriber.retry.initial-retry-delay-second: 4
spring.cloud.gcp.pubsub.subscriber.retry.max-attempts: 5
spring.cloud.gcp.pubsub.subscriber.retry.initial-rpc-timeout-seconds: 4
spring.cloud.gcp.pubsub.subscriber.retry.max-rpc-timeout-seconds: 8
spring.cloud.gcp.pubsub.subscriber.retry.max-retry-delay-seconds: 7
spring.cloud.gcp.pubsub.subscriber.retry.total-timeout-seconds: 3000
但它对消息重新出现的时间没有影响。我是否错误地理解了重试设置的含义?也许它们仅在存在某些连接问题时才生效,而不是在 nacking 或缺少确认的情况下?或者我是否必须在使用部署管理器创建订阅时设置设置,并且不允许从代码中设置它们?或者也许将它们设置在(开发)配置文件配置中不适用于 PubSubAdmin?感谢您的任何建议!
编辑:我希望第一次重试在 5 秒后发生,但下次重试 10 秒,等等。另外我想设置最大重试次数。所以我不感兴趣的是将 ackDeadline 设置为更大的数字。
编辑2:为什么nacking:其中一项服务(我们称之为桥接器)正在订阅消息,必须验证每条消息,如果可以,将其传递给另一个外部系统。该服务充当该系统的桥梁,因为我们不能直接在第二个系统上工作。在某些情况下,消息需要一些额外的信息,所以桥会尝试在其他地方获取它(包括很多微服务),有时会发生这种情况,此时额外的信息不存在(还)。所以第一个想法是不确认消息,让它稍后再次出现。但我不想在接下来的 7 天内每 10 秒询问一次(使用 ackDeadline),我只想尝试几次,如果 2 小时后不存在,它将永远不会出现。所以我们尝试 nack 并希望这些重试设置可以帮助管理重新发送。但是因为他们没有,我想唯一的方法是自己构建一些东西来管理桥中的这些消息。也许存储消息 ID 和重试次数,以便我可以在例如 5 次后确认并将消息推送到另一个主题以不同方式处理它。或者有没有更好的解决方案?
解决方案
Cloud Pub/Sub 不为特定消息提供指数退避。除了告诉 Cloud Pub/Sub 您无法处理该消息外,nack 没有任何作用。
如果您要记录为什么需要取消消息,我可以提供更有用的答案。如果您无法处理当前负载,您可以使用此处描述的流控制选项来减少发送给客户端的未完成消息或字节数。如果您有已知错误的消息,则应该在推送到另一个死信主题后对它们进行确认,以单独处理。
对编辑 2 的回应:
如果您有这种补充消息的操作可能失败的情况,请在您的服务中自己对该操作实施您想要的任何退避机制。在构造订阅者时设置最大 ack 延长期(java 中的 setMaxAckExtensionPeriod),以确保您的客户端将每条消息的 ack 截止日期延长到足够长的时间,以进行重试链。
编辑 2
请注意,Pub/Sub 现在内置了对Dead Lettering的支持。
推荐阅读
- python - 如何更新有重复的json文件的一部分
- javascript - React Server 组件(RSC)可以有生命周期吗?
- wxpython - 我应该如何从 wxPython 的 FormatISOCombined(self, sep='T') 函数文档中读取 sep='T'?
- three.js - 仅沿曲线/多段线垂直拉伸几何图形
- python-3.x - 我需要帮助确定我应该将 IF 条件放在代码块中的哪个位置
- android - 为什么 viewModelScope.launch 中的代码在收集后结束?
- json - 使用 Neo4j 存储 json 数据
- javascript - 带有 --prod 标志的 Ionic 5 构建失败 - 类型上不存在属性“翻译”
- c# - 搜索文件时如何从 C# 中的 UnauthorizedAccessException 错误继续
- git - 如何在 Visual Studio 2019 v.16.8.xxx 中禁用 Git 源代码管理?