java - 使用 GCP PubSub 的 Spring Cloud Stream 消费者的退避策略
问题描述
在 Spring Boot 应用程序中,我使用 spring-cloud-stream for PubSub (spring-cloud-gcp-pubsub-stream-binder) 来订阅主题。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>
我使用@EnableBinding
and@StreamListener
注释来设置订阅者:
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(INPUT)
public void handleMessage(Message<String> message) {
...
}
}
在处理消息期间,可能会出现问题。在这种情况下,我会抛出一个异常以确保消息不会被确认并在以后重试。
根据spring cloud stream 文档,我应该能够使用这些属性
spring.cloud.stream.default.consumer.defaultRetryable=true
spring.cloud.stream.default.consumer.backOffInitialInterval=1000
spring.cloud.stream.default.consumer.backOffMultiplier=2.0
spring.cloud.stream.default.consumer.backOffMaxInterval=300000
spring.cloud.stream.default.consumer.maxAttempts=9999
或针对特定通道(在这种情况下为输入)
spring.cloud.stream.bindings.input.consumer.defaultRetryable=true
spring.cloud.stream.bindings.input.consumer.backOffInitialInterval=1000
spring.cloud.stream.bindings.input.consumer.backOffMultiplier=2.0
spring.cloud.stream.bindings.input.consumer.backOffMaxInterval=300000
spring.cloud.stream.bindings.input.consumer.maxAttempts=9999
但是这些属性似乎没有在我的应用程序中使用。无论上述属性中使用什么值,消息都会每 100 毫秒重试一次。
谁能帮我设置正确的重试和/或退避设置,以便相应地重试消息?
可以在GitHub 上找到一个完整的最小示例来说明我的问题,如下所示:
制片人:
@Component
public class Main {
private static final Logger LOG = getLogger(Main.class);
private boolean firstExecution = true;
@Autowired
private SuccessSwitch consumerSuccessSwitch;
@Autowired
private PubSubTemplate pubSubTemplate;
@Scheduled(fixedDelay = 10000)
public void doSomethingAfterStartup() {
if (firstExecution) {
firstExecution = false;
consumerSuccessSwitch.letFail();
pubSubTemplate.publish("topic", "payload");
LOG.info("Message published");
} else {
consumerSuccessSwitch.letSucceed();
}
}
}
消费者:
@EnableBinding(Sink.class)
class Subscriber {
private static final Logger LOG = getLogger(Subscriber.class);
@Autowired
private SuccessSwitch successSwitch;
private int retryCounter = 0;
@StreamListener(INPUT)
public void handleMessage(Message<String> message) {
LOG.info("Received: {} for the {} time", message.getPayload(), ++retryCounter);
if (!successSwitch.succeeded()) {
throw new RuntimeException();
}
LOG.info("Received: {} times", retryCounter);
}
}
在消费者中切换 ack/nack:
@Component
public class SuccessSwitch {
private boolean success = false;
public void letSucceed() {
this.success = true;
}
public void letFail() {
this.success = false;
}
public boolean succeeded() {
return success;
}
}
解决方案
推荐阅读
- python - 如何在bs4中的一行中获取相同类名的所有值
- c++ - 无法使用 CMAKE 将 bulletphysics 编译为静态库,用于 android-ndk C++ 项目
- c++ - 同一主机下的C++串行通信
- css - 更改为移动布局时,material-ui 响应更改
- button - Google 登录按钮更改 Android Studio
- python - pyCharm 无法打开禁用“在工具窗口中显示绘图”的绘图 - 可能是由于阻塞
- .net-core - GtkSharp 的 .NET Core / 5.0 Nuget 库中是否有 Gdk.PangoRenderer 的实现?
- python - 如何解决 pip install 期间发生的身份验证过程
- azure - 备份 Azure 数据库时如何备份事务日志?
- macos - SwiftUI NavigationView 与 macOS 上的窗口内混合