首页 > 解决方案 > 使用 GCP PubSub 的 Spring Cloud Stream 消费者的退避策略

问题描述

在 Spring Boot 应用程序中,我使用 spring-cloud-stream for PubSub (spring-cloud-gcp-pubsub-stream-binder) 来订阅主题。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>

我使用@EnableBindingand@StreamListener注释来设置订阅者:

@EnableBinding(Sink.class)
class Subscriber {

    @StreamListener(INPUT)
    public void handleMessage(Message<String> message) {
        ...
    }
}

在处理消息期间,可能会出现问题。在这种情况下,我会抛出一个异常以确保消息不会被确认并在以后重试。

根据spring cloud stream 文档,我应该能够使用这些属性

spring.cloud.stream.default.consumer.defaultRetryable=true
spring.cloud.stream.default.consumer.backOffInitialInterval=1000
spring.cloud.stream.default.consumer.backOffMultiplier=2.0
spring.cloud.stream.default.consumer.backOffMaxInterval=300000
spring.cloud.stream.default.consumer.maxAttempts=9999

或针对特定通道(在这种情况下为输入)

spring.cloud.stream.bindings.input.consumer.defaultRetryable=true
spring.cloud.stream.bindings.input.consumer.backOffInitialInterval=1000
spring.cloud.stream.bindings.input.consumer.backOffMultiplier=2.0
spring.cloud.stream.bindings.input.consumer.backOffMaxInterval=300000
spring.cloud.stream.bindings.input.consumer.maxAttempts=9999

但是这些属性似乎没有在我的应用程序中使用。无论上述属性中使用什么值,消息都会每 100 毫秒重试一次。

谁能帮我设置正确的重试和/或退避设置,以便相应地重试消息?

可以在GitHub 上找到一个完整的最小示例来说明我的问题,如下所示:

制片人:

@Component
public class Main {

    private static final Logger LOG = getLogger(Main.class);

    private boolean firstExecution = true;

    @Autowired
    private SuccessSwitch consumerSuccessSwitch;
    @Autowired
    private PubSubTemplate pubSubTemplate;

    @Scheduled(fixedDelay = 10000)
    public void doSomethingAfterStartup() {
        if (firstExecution) {
            firstExecution = false;
            consumerSuccessSwitch.letFail();

            pubSubTemplate.publish("topic", "payload");
            LOG.info("Message published");
        } else {
            consumerSuccessSwitch.letSucceed();
        }
    }
}

消费者:

@EnableBinding(Sink.class)
class Subscriber {

    private static final Logger LOG = getLogger(Subscriber.class);

    @Autowired
    private SuccessSwitch successSwitch;
    private int retryCounter = 0;

    @StreamListener(INPUT)
    public void handleMessage(Message<String> message) {
        LOG.info("Received: {} for the {} time", message.getPayload(), ++retryCounter);

        if (!successSwitch.succeeded()) {
            throw new RuntimeException();
        }
        LOG.info("Received: {} times", retryCounter);
    }
}

在消费者中切换 ack/nack:

@Component
public class SuccessSwitch {

    private boolean success = false;

    public void letSucceed() {
        this.success = true;
    }

    public void letFail() {
        this.success = false;
    }

    public boolean succeeded() {
        return success;
    }
}

标签: javaspring-bootgoogle-cloud-platformspring-cloud-streamgoogle-cloud-pubsub

解决方案


推荐阅读