首页 > 解决方案 > 在 Durable Function 中等待数据工厂管道完成

问题描述

我有许多要执行的 Azure 数据工厂管道。管道层次结构本质上是一个 DAG,因此我可以使用拓扑排序按顺序执行它们。

我正在考虑使用 Durable Functions 执行此操作,因为它们似乎适合用例。但是,我不知道如何让编排器知道特定管道已完成。我不希望对管道进行修改,例如最后调用接受/拒绝函数。

我知道我可以执行以下代码之类的操作,但这是否违反有关持久功能的最佳实践?

var pipelineRunId = await context.CallActivityAsync<string>("StartPipeline", pipelineId);
var hasFinished = false;
while(!hasFinished)
{
    var fireAt = context.CurrentUtcDateTime.AddSeconds(30);
    await context.CreateTimer(fireAt, CancellationToken.None);
    hasFinished = await context.CallActivityAsync<bool>("CheckPipelineStatus", pipelineRunId);
}

标签: c#azure-functionsazure-data-factoryazure-durable-functions

解决方案


Durable Functions 与 Azure Functions 的主要区别在于它们是异步运行的。当您触发一个 Durable Function 时,它会创建一个后台进程,并为您提供一些您可以与该进程交互的 URL;包括一个查询其状态。

您可以通过 HTTP 端点触发一个 Durable Function,等待它完成然后获取结果。这是它的样子:

在此处输入图像描述

这是它在直到活动中的样子:

在此处输入图像描述

这是过程:

  1. 首先,我们使用 Azure Function 活动通过 HTTP 触发器触发 Durable Function。

  2. 然后使用直到活动,我们检查该函数的状态。

    • Wait 活动等待大约 30 秒(或不同,取决于您)让函数执行。

    • Web 活动向Azure Function 活动返回的statusQueryUrl发出请求,方法是调用**@activity('StartUntar').output.statusQueryGetUri**

  3. 直到活动用表达式检查 CheckStatus Web 活动的结果**@not(or(equals(activity('CheckStatus').output.runtimeStatus, 'Pending'), equals(activity('CheckStatus').output.runtimeStatus, 'Running')))**

  4. 它会重复直到函数完成或失败,或者直到超时(在 Timeout 属性上设置)


推荐阅读