首页 > 解决方案 > 让 python web API 一次只运行一个?

问题描述

我想制作一个 python Azure Function App (web API) 来处理任务队列。我已经设置了一些触发器,只要将任务插入队列,就会调用这个 API。由于此 API 将处理队列中的所有当前任务,因此如果此时此 API 有其他执行,我想阻止该 API 执行,以避免处理冲突。

我想使用数据库锁定机制,但它看起来并不那么优雅。是否有任何单例设计模式可以在 Python Azure 函数 App 中用于此目的?谢谢。

标签: python

解决方案


我找到了一种使用 Azure Durable 功能解决此问题的方法。Azure Durable Function 应用中有 3 种类型的函数:Orchestration Client 函数、Orchestrator 函数、Activity 函数。我只需要在 Orchestration Client 函数中添加一些检查步骤,如下例所示:

# This function an HTTP starter function for Durable Functions.
import logging
import azure.functions as func
import azure.durable_functions as df

def is_finished(runtime_status : df.models.OrchestrationRuntimeStatus):
    result = False
    if runtime_status is None or \
       runtime_status in [df.OrchestrationRuntimeStatus.Canceled,
                          df.OrchestrationRuntimeStatus.Completed,
                          df.OrchestrationRuntimeStatus.Failed,
                          df.OrchestrationRuntimeStatus.Terminated]:
        result = True
    return result

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:

    client = df.DurableOrchestrationClient(starter)
    # general azure function url : http://<APP_NAME>.azurewebsites.net/api/<FUNCTION_NAME>
    # function.json -> "route": "orchestrators/{functionName}/{instanceId}"

    orchestrator_instance_id = req.route_params['instanceId']
    function_name = req.route_params['functionName']

    INVENSYNC_ORCHESTRATOR_INSTANCE_ID = '117610EF-BC37-4E31-BFA4-205EBB3CC54E' # just select any key

    if orchestrator_instance_id == INVENSYNC_ORCHESTRATOR_INSTANCE_ID:
    
        existing_instance_status = await client.get_status(orchestrator_instance_id)

        logging.info(f"InventorySyncHttpStart() - existing_instance_status = '{existing_instance_status}'.")
        if existing_instance_status is None or \
           is_finished(existing_instance_status.runtime_status):

            logging.info(f"InventorySyncHttpStart() - existing_instance_status.runtime_status = '{existing_instance_status.runtime_status}'.")
            orchestrator_instance_id = await client.start_new(function_name, orchestrator_instance_id)

            logging.info(f"Started orchestration with ID = '{orchestrator_instance_id}'.")

            result = client.create_check_status_response(req, orchestrator_instance_id)
        else:
            result = func.HttpResponse(status_code=409, body=f"An instance with ID '{orchestrator_instance_id}' already exists")

    else:
        result = func.HttpResponse(status_code=406, body=f"Invalid Instance ID '{orchestrator_instance_id}' in URL")

    return result

推荐阅读