首页 > 解决方案 > Google Cloud PubSub:未从 Cloud Functions 发送/接收所有消息


摘要:我的客户端代码通过向 Pub/Sub 主题发布消息来触发 861 后台 Google Cloud Function。每个 Cloud Function 执行一项任务,将结果上传到 Google Storage,并将消息发布到客户端代码正在侦听的另一个 Pub/Sub 主题。尽管所有 Cloud Functions 都已执行(通过 Google Storage 中的结果数量进行验证),但客户端代码并未收到所有消息。

服务器端:我有一个后台 Google Cloud 功能,每次将消息发布到 TRIGGER Pub/Sub 主题时都会触发该功能。消息数据的自定义属性充当函数参数,具体取决于函数执行特定任务。然后,它将结果上传到 Google Storage 中的存储桶,并向 RESULTS Pub/Sub 主题(与用于触发此功能的主题不同)发布一条消息(带有 taskID 和执行时间详细信息)。

客户端:我需要执行 861 个不同的任务,这需要使用 861 个略有不同的输入调用云函数。这些任务是相似的,云函数执行它们需要 20 秒到 2 分钟(中位数约为 1 分钟)。为此,我创建了一个从 Google Cloud Shell(或本地机器 shell)运行的 python 脚本。客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息,该主题同时触发尽可能多的 Cloud Functions,每个消息都传递一个唯一的 taskID,范围为 [0, 860]。然后,客户端 python 脚本以“同步拉取”方式轮询 RESULTS Pub/Sub 主题以获取任何消息。Cloud Function 在执行任务后使用唯一的 taskID 和时间详细信息将消息发布到 RESULTS Pub/Sub 主题。客户端使用这个唯一的 taskID 来识别消息来自哪个任务。它还有助于识别被丢弃的重复消息。


  1. 客户端 python 脚本向 TRIGGER Pub/Sub 主题发布 861 条消息(每条消息都有唯一的 taskID),并等待来自 Cloud Function 的结果消息。
  2. 调用 861 个不同的 Cloud Functions,每个 Cloud Functions 执行一个任务,将结果上传到 Google Storage,并将消息(带有 taskID 和执行时间详细信息)发布到 RESULTS Pub/Sub 主题。
  3. 客户端同步抓取所有消息并将任务标记为完成。

问题:当客户端轮询来自 RESULTS Pub/Sub 主题的消息时,我没有收到所有 taskID 的消息。我确信云函数被正确调用和执行(我在谷歌存储桶中有 861 个结果)。我重复了很多次,每次都发生。奇怪的是,丢失的 taskID 的数量每次都会改变,并且不同的 taskID 在不同的运行中会丢失。我还在跟踪收到的重复 taskID 的数量。表中给出了 5 次独立运行的唯一 taskID 接收、丢失和重复的数量。

SN   # of Tasks  Received  Missing  Repeated
1     861          860      1        25
2     861          840      21       3
3     861          851      10       1
4     861          837      24       3
5     861          856      5        1

我不确定这个问题可能来自哪里。鉴于数字的随机性以及丢失的任务 ID,我怀疑 Pub/Sub 至少一次交付逻辑中存在一些错误。如果在云函数中,我睡了几秒钟而不是执行任务,例如使用 time.sleep(5),那么一切正常(我在客户端收到所有 861 taskID)。


以下main.pyrequirements.txt作为 Google Cloud Function 部署client.py的客户端代码。运行具有 100 个并发任务的客户端python client.py 100,重复 5 次。每次丢失不同数量的 taskID。




This file is deployed as Google Cloud Function. This function starts,
sleeps for some seconds and pulishes back the taskID.

    gcloud functions deploy gcf_run --runtime python37 --trigger-topic <TRIGGER_TOPIC> --memory=128MB --timeout=300s

import time
from random import randint
from google.cloud import pubsub_v1

# Global variables
project_id = "<Your Google Cloud Project ID>"  # Your Google Cloud Project ID
topic_name = "<RESULTS_TOPIC>"  # Your Pub/Sub topic name

def gcf_run(data, context):
    """Background Cloud Function to be triggered by Pub/Sub.
         data (dict): The dictionary with data specific to this type of event.
         context (google.cloud.functions.Context): The Cloud Functions event

    # Message should contain taskID (in addition to the data)
    if 'attributes' in data:
        attributes = data['attributes']
        if 'taskID' in attributes:
            taskID = attributes['taskID']
            print('taskID missing!')
        print('attributes missing!')

    # Sleep for a random time beteen 30 seconds to 1.5 minutes
    print("Start execution for {}".format(taskID))
    sleep_time = randint(30, 90)  # sleep for this many seconds
    time.sleep(sleep_time)  # sleep for few seconds

    # Marks this task complete by publishing a message to Pub/Sub.
    data = u'Message number {}'.format(taskID)
    data = data.encode('utf-8')  # Data must be a bytestring
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    publisher.publish(topic_path, data=data, taskID=taskID)



The client code creates the given number of tasks and publishes to Pub/Sub,
which in turn calls the Google Cloud Functions concurrently.
    python client.py 100

from __future__ import print_function
import sys
import time
from google.cloud import pubsub_v1

# Global variables
project_id = "<Google Cloud Project ID>" # Google Cloud Project ID
topic_name = "<TRIGGER_TOPIC>"    # Pub/Sub topic name to publish
subscription_name = "<subscriber to RESULTS_TOPIC>"  # Pub/Sub subscription name
num_experiments = 5  # number of times to repeat the experiment
time_between_exp = 120.0 # number of seconds between experiments

# Initialize the Publisher (to send commands that invoke Cloud Functions)
# as well as Subscriber (to receive results written by the Cloud Functions)
# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,   # One second
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

class Task:
    A task which will execute the Cloud Function once.

        taskID (int)       : A unique number given to a task (starting from 0).
        complete (boolean) : Flag to indicate if this task has completed.
    def __init__(self, taskID):
        self.taskID = taskID
        self.complete = False

    def start(self):
        Start the execution of Cloud Function by publishing a message with
        taskID to the Pub/Sub topic.
        data = u'Message number {}'.format(self.taskID)
        data = data.encode('utf-8')  # Data must be a bytestring
        publisher.publish(topic_path, data=data, taskID=str(self.taskID))

    def end(self):
        Mark the end of this task.
            Returns (boolean):
                True if normal, False if task was already marked before.
        # If this task was not complete, mark it as completed
        if not self.complete:
            self.complete = True
            return True

        return False
    # [END of Task Class]

def createTasks(num_tasks):
    Create a list of tasks and return it.
            num_tasks (int) : Number of tasks (Cloud Function calls)
        Returns (list):
            A list of tasks.
    all_tasks = list()
    for taskID in range(0, num_tasks):

    return all_tasks

def receiveResults(all_tasks):
    Receives messages from the Pub/Sub subscription. I am using a blocking
    Synchronous Pull instead of the usual asynchronous pull with a callback
    funtion as I rely on a polling pattern to retrieve messages.
    See: https://cloud.google.com/pubsub/docs/pull
            all_tasks (list) : List of all tasks.
    num_tasks = len(all_tasks)
    total_msg_received = 0  # track the number of messages received
    NUM_MESSAGES = 10  # maximum number of messages to pull synchronously
    TIMEOUT = 600.0    # number of seconds to wait for response (10 minutes)

    # Keep track of elapsed time and exit if > TIMEOUT
    __MyFuncStartTime = time.time()
    __MyFuncElapsedTime = 0.0

    print('Listening for messages on {}'.format(subscription_path))
    while (total_msg_received < num_tasks) and (__MyFuncElapsedTime < TIMEOUT):
        # The subscriber pulls a specific number of messages.
        response = subscriber.pull(subscription_path,
            max_messages=NUM_MESSAGES, timeout=TIMEOUT, retry=None)
        ack_ids = []

        # Keep track of all received messages
        for received_message in response.received_messages:
            if received_message.message.attributes:
                attributes = received_message.message.attributes
                taskID = int(attributes['taskID'])
                if all_tasks[taskID].end():
                    # increment count only if task completes the first time
                    # if False, we received a duplicate message
                    total_msg_received += 1
                #     print("Received taskID = {} ({} of {})".format(
                #         taskID, total_msg_received, num_tasks))
                # else:
                #     print('REPEATED: taskID {} was already marked'.format(taskID))
                print('attributes missing!')


        # Acknowledges the received messages so they will not be sent again.
        if ack_ids:
            subscriber.acknowledge(subscription_path, ack_ids)

        time.sleep(0.2)  # Wait 200 ms before polling again
        __MyFuncElapsedTime = time.time() - __MyFuncStartTime
        # print("{} s elapsed. Listening again.".format(__MyFuncElapsedTime))

    # if total_msg_received != num_tasks, function exit due to timeout
    if total_msg_received != num_tasks:
        print("WARNING: *** Receiver timed out! ***")
    print("Received {} messages out of {}. Done.".format(
        total_msg_received, num_tasks))

def main(num_tasks):
    Main execution point of the program

    for experiment_num in range(1, num_experiments + 1):
        print("Starting experiment {} of {} with {} tasks".format(
            experiment_num, num_experiments, num_tasks))
        # Create all tasks and start them
        all_tasks = createTasks(num_tasks)
        for task in all_tasks:     # Start all tasks
        print("Published {} taskIDs".format(num_tasks))

        receiveResults(all_tasks)  # Receive message from Pub/Sub subscription

        print("Waiting {} seconds\n\n".format(time_between_exp))
        time.sleep(time_between_exp)  # sleep between experiments

if __name__ == "__main__":
    if(len(sys.argv) != 2):
        print("usage: python client.py  <num_tasks>")
        print("    num_tasks: Number of concurrent Cloud Function calls")

    num_tasks = int(sys.argv[1])

publisher.publish(topic_path, data=data, taskID=taskID)

您不会等待 publisher.publish 返回的未来。这意味着您无法保证当您从gcf_run函数结束时发布到该主题实际上已经发生,但是 TRIGGER 主题云函数订阅上的消息无论如何都是 ACK-ed。


publisher.publish(topic_path, data=data, taskID=taskID).result()

