首页 > 解决方案 > 从 python 管理 Azure 中的事件中心

问题描述

我正在尝试编写 python 代码,以便能够从 python 脚本在 Azure 事件中心上创建/删除事件中心。我已经设法按照此页面上的文档创建了一个 EventHubManagementClient 。我相信我现在需要使用此处记录的 EventHubsOperations 类。

我有两个挑战:

  1. “from aaaa import EventHubsOperations”行中的“aaaa”是什么,以便能够引用该类?我似乎找不到如何调用相应的包来导入类......
  2. 在使用类时,为配置、序列化器和反序列化器传递哪些值都是必需的值?也许有人可以分享一个如何使用这个类的例子?

理想情况下,我希望调用 create_or_delete 方法来创建新的事件中心或从 python 脚本中删除现有的事件中心。如果有人可以分享如何扩展此代码以实现此目的,我将不胜感激。该文档似乎非常轻松:“配置,必需,服务客户端配置”......

我的代码如下:

import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

setenv.import_env_vars('')

vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]

credential = DefaultAzureCredential()

print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')

EventHubsOperations(EH_client)

结果输出如下:

Project root: 
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
  File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
    EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined

Process finished with exit code 1

标签: pythonazure-eventhub

解决方案


这是我用于从 python 创建和删除 eventthubs 的代码。

我使用单独的脚本 (setenv.py) 来加载存储在文本文件中的环境变量。

import os
import setenv
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)

set_env_path="C:\\Users\\db533\\PycharmProjects\\GunaBot2\\shared_files\\"
setenv.import_env_vars(set_env_path,'env_values')

def main():
    SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
    GROUP_NAME = "annabot-eventhub2"
    STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
    NAMESPACE_NAME = "annabot-eventhub999"
    EVENTHUB_NAME = "worker99901"

    tenant_id = os.environ["AZURE_TENANT_ID"]
    client_id = os.environ["AZURE_CLIENT_ID"]
    client_secret = os.environ["AZURE_CLIENT_SECRET"]
    print('AZURE_CLIENT_SECRET:',client_secret)

    credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)

    # Create client
    print(" Create resource client...")
    resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)

    print(" Create Event hub client...")
    eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)

    print(" Create storage client...")
    storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)

    # Create resource group
    print(" Create resource group...")
    resource_client.resource_groups.create_or_update(
        GROUP_NAME,
        {"location": "germanywestcentral"}
    )

    # Create StorageAccount
    print(" Create storageAccount...")
    storage_async_operation = storage_client.storage_accounts.create(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME,
        StorageAccountCreateParameters(
            sku=Sku(name=SkuName.standard_lrs),
            kind=Kind.storage_v2,
            location='germanywestcentral'
        )
    )
    storage_account = storage_async_operation.result()

    # Create Namespace
    print(" Create event hub namespace...")
    eventhub_client.namespaces.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        {
          "sku": {
            "name": "Standard",
            "tier": "Standard"
          },
          "location": "Germany West Central",
          "tags": {
            "tag1": "value1",
            "tag2": "value2"
          },
          "kafka_enabled": "True"
        }
    ).result()

    # Create EventHub
    print(" Create event hub...")
    eventhub = eventhub_client.event_hubs.create_or_update(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME,
        {
          "message_retention_in_days": "4",
          "partition_count": "4",
          "status": "Active",
          "capture_description": {
            "enabled": True,
            "encoding": "Avro",
            "interval_in_seconds": "120",
            "size_limit_in_bytes": "10485763",
            "destination": {
              "name": "EventHubArchive.AzureBlockBlob",
              "storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
              "blob_container": "container",
              "archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
            }
          }
        }
    )
    print("Created EventHub: {}".format(eventhub))

    # Get EventHub
    eventhub = eventhub_client.event_hubs.get(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("get() for EventHub: {}\n".format(eventhub))

    #Create authorisation rule
    eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager",
        rights=["LISTEN","SEND"]
    )
    print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))

    # Get authorisation rule
    eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))

    # List keys
    namespace_keys = eventhub_client.event_hubs.list_keys(
        GROUP_NAME,
        NAMESPACE_NAME,
        event_hub_name=EVENTHUB_NAME,
        authorization_rule_name="manager"
    )
    print("list_keys() for EventHub: {}\n".format(namespace_keys))
    print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)

# Delete EventHub
    eventhub_client.event_hubs.delete(
        GROUP_NAME,
        NAMESPACE_NAME,
        EVENTHUB_NAME
    )
    print("Delete EventHub.")

    # Delete Namespace
    eventhub_client.namespaces.delete(
        GROUP_NAME,
        NAMESPACE_NAME
    ).result()

    # Delete StorageAccount
    storage_client.storage_accounts.delete(
        GROUP_NAME,
        STORAGE_ACCOUNT_NAME
    )

    # Delete resource group
    resource_client.resource_groups.delete(
        GROUP_NAME
    ).result()


if __name__ == "__main__":
    main()

加载环境变量的 setenv.py 脚本如下。(我从另一个答案中得到了这个。不能为此归功于......):

import os

def import_env_vars(env_folder,env_filename):
    """Imports some environment variables from a special .env file in the
    project root directory.
    """
    print("env_folder:",env_folder)
    if len(env_folder) > 0 and env_folder[-1] != '\\':
        env_folder += '\\'
    try:
        print("filename:",env_folder+env_filename)
        envfile = open(env_folder+env_filename, "r")

    except IOError:
        raise Exception("You must have a {0} file in your project root "
                        "in order to run the server in your local machine. "
                        "This specifies some necessary environment variables. ")
    for line in envfile.readlines():
        [key,value] = line.strip().split("=")
        os.environ[key] = value
        print("key:",key)
        print("value:", value)

环境变量在文件中定义如下:

EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
DEV_STAGE=Dev
AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq

希望这对其他人有帮助。


推荐阅读