首页 > 解决方案 > 如何在 Azure 数据工厂 - Databricks 中使用 continuation_token 获取 ADF 管道运行详细信息的下一页?

问题描述

我在用

adf_client.pipeline_runs.query_by_factory(resourceGroupName, factoryName, filter_parameters)

azure.mgmt.datafactory.DataFactoryManagementClient包的方法,用于获取 ADF 管道运行详细信息。

上述函数的响应一次返回 100 条管道运行记录。与响应一起,它返回continuation_token,我相信它应该用于获取下一组/页面记录。

我不确定要使用哪个功能。我尝试使用azure.mgmt.datafactory.models.PipelineRun()函数(反复试验)来查看是否满足要求。不幸的是,事实并非如此。MS 文档也很抽象,很难理解。

那么,Azure Python SDK 中的哪个函数可以用来获取下一页的运行记录呢?

标签: pythonazureazure-data-factoryazure-databricks

解决方案


continuation_token当你有下一页结果时,如果存在任何剩余的结果,你会得到,否则为 null 。

这是其用法的示例,但是我目前没有足够的管道运行来显示令牌本身。

在此处输入图像描述

现在,在您的情况下,您已经收到了一个,所以这里是您如何使用它。

考虑pipeline_runs是保存结果,pipeline_runs.continuation_token是我们需要获取并在另一个请求中传回以获取下一页的内容。

添加一个简单的循环,例如While检查是否pipeline_runs.continuation_token存在并请求下一页,直到返回的令牌值是Null- 结果的结尾。

在此处输入图像描述

完整的工作实现:

from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
from azure.identity import ClientSecretCredential 


subscription_id = "b83c1wd3-xxxx-xxxx-xxxx-2b83a074c23f"
rg_name = "My-rg"
df_name = "ktestadf"

tenant_id = "12f978bf-xxxx-xxxx-xxxx-2d7cd011db47"
client_id = "a71ad3ca-xxxx-xxxx-xxxx-af0c2a3fdae1"
client_secret = "Nym7Q~j5YMyxxxxxx3tAk879y9vLrxAQqaI8n"

credentials = ServicePrincipalCredentials(client_id=client_id, secret=client_secret, tenant=tenant_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)

filter_params = PipelineRunFilterParameters(last_updated_after=datetime.now() - timedelta(30), last_updated_before=datetime.now() + timedelta(1))
pipeline_runs = adf_client.pipeline_runs.query_by_factory(resource_group_name=rg_name, factory_name=df_name, filter_parameters = filter_params)


for pipeline_run in pipeline_runs.value:

    print(pipeline_run)

while (pipeline_runs.continuation_token):

    pipeline_runs = adf_client.pipeline_runs.query_by_factory(
    resource_group_name=rg_name, factory_name=df_name, filter_parameters=filter_params, continuation_token = pipeline_runs.continuation_token)
    print(pipeline_runs.value)

您可以选择不打印较早pipeline_runsfor循环调用,我只是将它们显示在代码中以供参考。


推荐阅读