首页 > 解决方案 > PubSub 在确认截止日期后未重新发送消息

问题描述

我有两个订阅者指向主题用例的相同订阅。如果订阅者花费的时间超过确认截止日期来确认消息,则根据文档 pub sub 重新传递消息。

我已经配置了默认值,即 10 秒。但处理完成和确认大约需要 1 分钟。下面是我的示例代码

public class SubscribeAsyncExample {

    private Subscriber subscriber = null;

    @PostConstruct
    public void init() throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String subscriptionId = "your-subscription-id";
        subscribeAsyncExample(projectId, subscriptionId);
    }

    public void subscribeAsyncExample(String projectId, String subscriptionId) {
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);

        // Instantiate an asynchronous message receiver.
        MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
            // Handle incoming message, then ack the received message.
            System.out.println("Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
            int sleepingTime = 20000;
            System.out.println("sleepingTime:" + sleepingTime);
            try { 
                Thread.sleep(sleepingTime); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            consumer.ack();
            System.out.println("test completed");
        };
        try {
            subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
            // Start the subscriber.
            subscriber.startAsync().awaitRunning();
            System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
            // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
            // subscriber.awaitTerminated(30, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void preDestroy() throws Exception {
        // Shut down the subscriber after 30s. Stop receiving messages.
        subscriber.stopAsync();
    }

}```

Below is Response
20:53:24,300 INFO  [stdout] (Thread-128) Id: 1288313732423842
20:53:24,300 INFO  [stdout] (Thread-128) Data: abc13
**20:53:24,300 INFO**  [stdout] (Thread-128) sleepingTime:20000
**20:53:44,300 INFO**  [stdout] (Thread-128) test completed

标签: publish-subscribegoogle-cloud-pubsub

解决方案


使用Cloud Pub/Sub 客户端库时,截止日期会自动延长MaxAckExtensionPeriod Subscriber.Buider. 此延长期限默认为一小时。要更改此值,您需要更改创建订阅者的行,如下所示:

subscriber = Subscriber.newBuilder(subscriptionName, receiver)
    .setMaxAckExtensionPeriod(Duration.ofSeconds(10))
    .build();

推荐阅读