python - 在 Azure Autonmation Runbook 中使用 Python 通过 JobId 获取作业的输出
问题描述
我想在 Azure 自动化中使用 Python 获取作业的输出或其状态。我可以在 powershell 中执行,但我想在 Python 中执行。python 中是否有任何等效的 SDK 用于 -AzureRmAutomationJob?
解决方案
下面是创建示例 Azure 自动化任务并获取作业输出的示例代码。
"""
import time
import uuid
import requests
import automationassets
# Automation resource group and account to start runbook job in
_AUTOMATION_RESOURCE_GROUP = "contoso"
_AUTOMATION_ACCOUNT = "contosodev"
# Set up required body values for a runbook.
# Make sure you have a hello_world_python runbook published in the automation account
# with an argument of -n
body = {
"properties":
{
"runbook":
{
"name":"hello_world_python"
},
"parameters":
{
"[PARAMETER 1]":"-n",
"[PARAMETER 2]":"world"
}
}
}
# Return token based on Azure automation Runas connection
def get_automation_runas_token(runas_connection):
""" Returs a token that can be used to authenticate against Azure resources """
from OpenSSL import crypto
import adal
# Get the Azure Automation RunAs service principal certificate
cert = automationassets.get_automation_certificate("AzureRunAsCertificate")
sp_cert = crypto.load_pkcs12(cert)
pem_pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, sp_cert.get_privatekey())
# Get run as connection information for the Azure Automation service principal
application_id = runas_connection["ApplicationId"]
thumbprint = runas_connection["CertificateThumbprint"]
tenant_id = runas_connection["TenantId"]
# Authenticate with service principal certificate
resource = "https://management.core.windows.net/"
authority_url = ("https://login.microsoftonline.com/" + tenant_id)
context = adal.AuthenticationContext(authority_url)
azure_credential = context.acquire_token_with_client_certificate(
resource,
application_id,
pem_pkey,
thumbprint)
# Return the token
return azure_credential.get('accessToken')
# Authenticate to Azure using the Azure Automation RunAs service principal
automation_runas_connection = automationassets.get_automation_connection("AzureRunAsConnection")
access_token = get_automation_runas_token(automation_runas_connection)
# Set what resources to act against
subscription_id = str(automation_runas_connection["SubscriptionId"])
job_id = str(uuid.uuid4())
# Set up URI to create a new automation job
uri = ("https://management.azure.com/subscriptions/" + subscription_id
+ "/resourceGroups/" + _AUTOMATION_RESOURCE_GROUP
+ "/providers/Microsoft.Automation/automationAccounts/" + _AUTOMATION_ACCOUNT
+ "/jobs/(" + job_id + ")?api-version=2015-10-31")
# Make request to create new automation job
headers = {"Authorization": 'Bearer ' + access_token}
json_output = requests.put(uri, json=body, headers=headers).json()
# Get results of the automation job
_RETRY = 360 # stop after 60 minutes (360 * 10 sleep seconds / 60 seconds in a minute)
_SLEEP_SECONDS = 10
status_counter = 0
while status_counter < _RETRY:
status_counter = status_counter + 1
job = requests.get(uri, headers=headers).json()
status = job['properties']['status']
if status == 'Completed' or status == 'Failed' or status == 'Suspended' or status == 'Stopped':
break
time.sleep(_SLEEP_SECONDS)
# if job did not complete in an hour, throw an exception
if status_counter == 360:
raise StandardError("Job did not complete in 60 minutes.")
if job['properties']['status'] != 'Completed':
raise StandardError("Job did not complete successfully.")
# Get output streams from the job
uri = ("https://management.azure.com/subscriptions/" + subscription_id
+ "/resourceGroups/" + _AUTOMATION_RESOURCE_GROUP
+ "/providers/Microsoft.Automation/automationAccounts/" + _AUTOMATION_ACCOUNT
+ "/jobs/" + job_id
+ "/streams?$filter=properties/streamType%20eq%20'Output'&api-version=2015-10-31")
job_streams = requests.get(uri, headers=headers).json()
# For each stream id, print out the text
for stream in job_streams['value']:
uri = ("https://management.azure.com/subscriptions/" + subscription_id
+ "/resourceGroups/" + _AUTOMATION_RESOURCE_GROUP
+ "/providers/Microsoft.Automation/automationAccounts/" + _AUTOMATION_ACCOUNT
+ "/jobs/" + job_id
+ "/streams/" + stream['properties']['jobStreamId']
+ "?$filter=properties/streamType%20eq%20'Output'&api-version=2015-10-31")
output_stream = requests.get(uri, headers=headers).json()
print output_stream['properties']['streamText']
您可以参考此网址以供进一步参考。希望对您有所帮助。
推荐阅读
- python - 如何防止值转换为日期或作为除法执行?
- google-cloud-platform - 使用 api 列出 gcp 项目资源类型
- c# - 无法通过先前已获得的 ID 获取记录
- javascript - 在不知道元素总数的情况下从数据流中随机拆分元素
- keras - keras:同时训练一个分层分类+回归模型
- php - 在其他设备上访问我的 Web 应用程序时出现错误 404
- javascript - 将 Date 实例传递给 Date 构造函数以克隆 Dates
- c++ - 线性搜索与二分搜索效率
- c# - Newtonsoft JSON 动态反序列化
- java - Flyway(SpringBoot)迁移与多个目录中的文件跳过版本