首页 > 解决方案 > Python 中的 Azure 持久函数:为什么没有触发函数 2?

问题描述

我的用例如下:

  1. F1:生成一些数据并将它们写入 CosmosDB(使用时间触发器)
  2. F2:读取刚刚写入的数据并添加用户名
  3. Orchestrator:控制工作流程并在 F1 完成后调用 F2

我的问题:只有 F1 有效,但 F2 根本没有触发。为什么?F1是否必须返回触发器或其他东西?

这就是我知道只执行 F1 的方式: 在此处输入图像描述

F1

import logging
import hashlib
import time
import datetime
from azure.cosmos import CosmosClient
import azure.functions as func

def generate_id(string=None, length=10):
    '''This function generates a hash id to be attached to each new row'''

    ts = time.time()
    guid = hashlib.shake_128((str(string) + str(ts)).encode()).hexdigest(10)
    return guid

def main(mytimer: func.TimerRequest, outputDocument: func.Out[func.Document]) -> None:

    utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()

    if mytimer.past_due:
        logging.info('The timer is past due')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    result1 = {
    "first_letter": "A",
    "second_letter": "B",
    "third_letter": "C",
    "score": 0.001,
    }

    result1['id'] = generate_id()

    outputDocument.set(func.Document.from_dict(result1))

    return

F1 函数.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "*/30 * * * * *"
    },
    {
      "type": "cosmosDB",
      "direction": "out",
      "name": "outputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"
    }
  ]
}

F2

import logging
import azure.functions as func
from azure.cosmos import CosmosClient

def add_username(string=None):
    '''Generate username'''

    name = "MyName"
    surname = "MySurname"
    username = name+" "+surname

    return username


def main(F1activitytrigger, inputDocument: func.DocumentList) -> str:

    if inputDocument:
        logging.info('Document id: %s', inputDocument[0]['id'])

    result2 = inputDocument[0].data

    result2['username'] = add_username() 

    return result2

F2 函数.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "F1activitytrigger",
      "type": "activityTrigger",
      "direction": "in"
    },
    {
      "type": "cosmosDB",
      "direction": "in",
      "name": "inputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"     
    }
  ]
}

编排器

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('Test-F1')
    result2 = yield context.call_activity('Test-F2')
    return [result1, result2]

main = df.Orchestrator.create(orchestrator_function)

Orchestrator function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

标签: pythonpython-3.xazureazure-durable-functions

解决方案


根据您的描述,F1 是按计划触发的函数(类型为“timerTrigger”)。

F1 不需要 Orchestrator 来调用它,但 F2 需要。

您需要在 Orchestrator 中调用 F2,因为 F2 的类型是“activityTrigger”。

因此,您的持久功能应如下所示:

F1

import logging

import azure.functions as func
import azure.durable_functions as df


async def main(mytimer: func.TimerRequest, starter: str) -> None:
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new("YourOrchestratorName", None, None)
    logging.info(f"Started orchestration with ID = '{instance_id}'.")

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "* * * * * *"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

YourOrchestratorName

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('F2', "Tokyo")
    result2 = yield context.call_activity('F2', "Seattle")
    result3 = yield context.call_activity('F2', "London")
    return [result1, result2, result3]

main = df.Orchestrator.create(orchestrator_function)

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

F2

import logging


def main(name: str) -> str:
    logging.info(f"Hello {name}!")
    return f"Hello {name}!"

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "name",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

推荐阅读