首页 > 解决方案 > Google pubsub 延迟确认

问题描述

我在 GKE 上部署了一个应用程序,在不同的微服务中分开。其中一个微服务,我们称之为“worker”,从发布订阅消息中接收要执行的任务。

执行这些任务最多可能需要 1 小时。Google pubsub 消息的常规确认截止日期非常短,我们在截止日期结束前每 10 秒更新一次截止日期。这是负责此的代码:

def watchdog(businessDoneEvent, subscription, ack_deadline, message, ack_id):
    '''
        Prevents message from being republished as long as computation is 
        running
    '''
    while True:
        # Wait (defaultDeadline - 10) seconds before renewing if defaultDeadline
        # is > 5 seconds; renewed every second otherwise
        sleepTime = ack_deadline - 10 if ack_deadline > 10 else 1
        startTime = time.time()
        while time.time() - startTime < sleepTime:
            LOGGER.info('Sleeping time: {} - ack_deadline: {}'.format(time.time() - startTime, ack_deadline))
            if businessDoneEvent.isSet():
                LOGGER.info('Business done!')
                return
            time.sleep(1)

        subscriber = SubscriberClient()

        LOGGER.info('Modifying ack deadline for message ' + 
                          str(message.data) + ' processing to ' +
                          str(ack_deadline))
        subscriber.modify_ack_deadline(subscription, [ack_id],
                                         ack_deadline)

执行结束后,我们到达这段代码:

def callbackWrapper(callback,
                    subscription,
                    message,
                    ack_id,
                    endpoint,
                    context,
                    subscriber,
                    postAcknowledgmentCallback=None):
    '''
        Pub/sub message acknowledgment if everything ran correctly
    '''
    try:
        callback(message.data, endpoint, context, **message.attributes)
    except Exception as e:
        LOGGER.info(message.data)
        LOGGER.error(traceback.format_exc())
        raise e
    else:
        LOGGER.info("Trying to acknowledge...")
        my_retry = Retry(predicate=if_exception_type(ServiceUnavailable), deadline=3600)
        subscriber.acknowledge(subscription, [ack_id], retry=my_retry)
        LOGGER.info(str(ack_id) + ' has been acknowledged')
        if postAcknowledgmentCallback is not None:
            postAcknowledgmentCallback(message.data, 
                                       **message.attributes)

请注意,我们也在大多数微服务中使用此代码,它工作得很好。

我的问题是,即使我没有从这段代码中得到任何错误,而且似乎确认请求已正确发送,但实际上稍后会确认。例如,根据 GCP 控制台,现在我有 8 条未确认的消息,但我应该只有 3 条。它还说有 12 条,而我一个小时应该只有 5 条: 在此处输入图像描述

我有一个使用 pubsub 指标的水平 pod 自动缩放器。豆荚完成后,它们不会按比例缩小,或者仅在 1 小时或更长时间之后。这会产生一些我想避免的无用成本。

有谁知道为什么会这样?

标签: pythongoogle-cloud-platformgoogle-kubernetes-enginegoogle-cloud-pubsub

解决方案


推荐阅读