publish-subscribe - PubSub 在确认截止日期后未重新发送消息
问题描述
我有两个订阅者指向主题用例的相同订阅。如果订阅者花费的时间超过确认截止日期来确认消息,则根据文档 pub sub 重新传递消息。
我已经配置了默认值,即 10 秒。但处理完成和确认大约需要 1 分钟。下面是我的示例代码
public class SubscribeAsyncExample {
private Subscriber subscriber = null;
@PostConstruct
public void init() throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeAsyncExample(projectId, subscriptionId);
}
public void subscribeAsyncExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
int sleepingTime = 20000;
System.out.println("sleepingTime:" + sleepingTime);
try {
Thread.sleep(sleepingTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.ack();
System.out.println("test completed");
};
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
// subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void preDestroy() throws Exception {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}```
Below is Response
20:53:24,300 INFO [stdout] (Thread-128) Id: 1288313732423842
20:53:24,300 INFO [stdout] (Thread-128) Data: abc13
**20:53:24,300 INFO** [stdout] (Thread-128) sleepingTime:20000
**20:53:44,300 INFO** [stdout] (Thread-128) test completed
解决方案
使用Cloud Pub/Sub 客户端库时,截止日期会自动延长MaxAckExtensionPeriod
至Subscriber.Buider
. 此延长期限默认为一小时。要更改此值,您需要更改创建订阅者的行,如下所示:
subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setMaxAckExtensionPeriod(Duration.ofSeconds(10))
.build();
推荐阅读
- rest - 嵌套资源的 REST url
- sql-server - 用于数据导入的 CSV 模板
- oracle - Oracle:格式化数字
- php - 如何检查变量是否“可循环”?
- node.js - nginx proxy_pass 不适用于本地主机
- ios - 后续视图中的 UINavigationItem 标题不会更新
- python - 如何在folium.circile地图python中的每个圆圈上添加标签
- opencv - 在 GPU 内存中保留关键点以实现高效的流水线操作
- javascript - momentjs 没有以不同的输入格式返回相同时区的相同输出
- python - 两个进程想要访问同一个 python 文件