python - Google pubsub 延迟确认
问题描述
我在 GKE 上部署了一个应用程序,在不同的微服务中分开。其中一个微服务,我们称之为“worker”,从发布订阅消息中接收要执行的任务。
执行这些任务最多可能需要 1 小时。Google pubsub 消息的常规确认截止日期非常短,我们在截止日期结束前每 10 秒更新一次截止日期。这是负责此的代码:
def watchdog(businessDoneEvent, subscription, ack_deadline, message, ack_id):
'''
Prevents message from being republished as long as computation is
running
'''
while True:
# Wait (defaultDeadline - 10) seconds before renewing if defaultDeadline
# is > 5 seconds; renewed every second otherwise
sleepTime = ack_deadline - 10 if ack_deadline > 10 else 1
startTime = time.time()
while time.time() - startTime < sleepTime:
LOGGER.info('Sleeping time: {} - ack_deadline: {}'.format(time.time() - startTime, ack_deadline))
if businessDoneEvent.isSet():
LOGGER.info('Business done!')
return
time.sleep(1)
subscriber = SubscriberClient()
LOGGER.info('Modifying ack deadline for message ' +
str(message.data) + ' processing to ' +
str(ack_deadline))
subscriber.modify_ack_deadline(subscription, [ack_id],
ack_deadline)
执行结束后,我们到达这段代码:
def callbackWrapper(callback,
subscription,
message,
ack_id,
endpoint,
context,
subscriber,
postAcknowledgmentCallback=None):
'''
Pub/sub message acknowledgment if everything ran correctly
'''
try:
callback(message.data, endpoint, context, **message.attributes)
except Exception as e:
LOGGER.info(message.data)
LOGGER.error(traceback.format_exc())
raise e
else:
LOGGER.info("Trying to acknowledge...")
my_retry = Retry(predicate=if_exception_type(ServiceUnavailable), deadline=3600)
subscriber.acknowledge(subscription, [ack_id], retry=my_retry)
LOGGER.info(str(ack_id) + ' has been acknowledged')
if postAcknowledgmentCallback is not None:
postAcknowledgmentCallback(message.data,
**message.attributes)
请注意,我们也在大多数微服务中使用此代码,它工作得很好。
我的问题是,即使我没有从这段代码中得到任何错误,而且似乎确认请求已正确发送,但实际上稍后会确认。例如,根据 GCP 控制台,现在我有 8 条未确认的消息,但我应该只有 3 条。它还说有 12 条,而我一个小时应该只有 5 条:
我有一个使用 pubsub 指标的水平 pod 自动缩放器。豆荚完成后,它们不会按比例缩小,或者仅在 1 小时或更长时间之后。这会产生一些我想避免的无用成本。
有谁知道为什么会这样?
解决方案
推荐阅读
- java - 如果图书馆有一些副作用,这可以吗?
- python - 2D numpy 数组的形状不正确
- linux - Shell:将变量插入命令
- node.js - Typescript Cipher.getAuthTag 不存在
- powershell - 如何使此组过滤脚本仅适用于活动用户
- vue.js - 可以在 vue.js 应用程序中排除 `this` 关键字吗?
- bitbucket - 源代码树和 git 中的 Bitbucket 凭据问题
- java - Java lambda 交集两个列表并从结果中删除
- laravel - 在其他类的静态方法中调用重定向
- c# - Directory.CreateDirectory 创建一个 Visual Studio 无法访问的文件夹