首页 > 解决方案 > GCP 云功能未正确接收/确认 PubSub 消息

问题描述

我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址并返回一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。

在工作流中使用一个 Google Cloud 函数时,某些消息不会从触发流中提取或被多次提取。我知道这在一定程度上是可以预料的。但是,这种情况经常发生。这足以导致某些地点被夸大 10 倍,而其他几个地点却没有结果。

我认为该callback功能没有正确确认消息,但我不确定要获得更可靠的消息接收和确认应该有什么不同。任何建议表示赞赏。

我用于检索指标的 GCP 云函数由 PubSub 流触发,并执行retrieve_location将数据发送到不同 PubSub 流的函数。该retrieve_location函数如下所示:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metrics函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

标签: pythongoogle-cloud-platformgoogle-cloud-functionsgoogle-cloud-pubsub

解决方案


与其在 Cloud Function 中设置第二个 Pub/Sub 订阅者,不如创建一个后台函数来订阅直接处理有效负载的主题,例如:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)

推荐阅读