首页 > 解决方案 > 使用 Python 接收和写入 Azure blob 存储的 Azure 组合器函数

问题描述

我想使用 Python 创建一个 Azure 函数,该函数将从 Azure 事件中心读取数据。幸运的是,Visual Studio Code 提供了一种创建 Azure 函数骨架的方法。可以根据需要进行编辑。我可以在 Microsoft 文档的帮助下创建一个演示 HTTP 触发器 Azure 函数,但我不知道我应该在下面的函数中进行什么更改,以便它可以从事件中心读取数据并将其写入 Azure Blob 存储。此外,如果有人可以推荐任何博客以获取有关 azure 功能和标准实践的更多详细信息。

更新:

我尝试根据@Stanley 的建议更新我的代码,但可能需要更新代码。我在 Azure 函数中编写了以下代码。

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "Storage account connection string",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "EventHub_ReceiverConnectionString": "Endpoint Connection String of the EventHubNamespace",
    "Blob_StorageConnectionString": "Storage account connection string"
  }
}

函数.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "eventHubTrigger",
      "direction": "in",
      "name": "event",
      "eventHubName": "pwo-events",
      "connection": "EventHub_ReceiverConnectionString",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    }
  ]
}

初始化.py

import logging
import azure.functions as func
from azure.storage.blob import BlobClient

storage_connection_string='Storage account connection string'
container_name = ''


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')
    blob_client = BlobClient.from_connection_string(storage_connection_string,container_name,str(event.sequence_number) + ".txt")
    blob_client.upload_blob(event.get_body().decode())

以下是我的 blob 容器的屏幕截图: 在此处输入图像描述

在执行了上面的代码之后,一些东西被写入了 blob 容器。但不是txt文件,而是以其他格式保存。此外,如果我多次触发 azure 函数,则文件将被覆盖。我想执行追加操作而不是覆盖。另外,我想将我的文件保存在用户定义的位置。示例:容器/年=/月=/日期= 谢谢!!

标签: python-3.xazurehttpazure-functionsazure-eventhub

解决方案


如果要从 Azure 事件中心读取数据,使用事件中心触发器会容易得多,这是我的测试代码(读取数据并写入存储):

import logging
import azure.functions as func
from azure.storage.blob import BlobClient
import datetime

storage_connection_string=''
container_name = ''

today = datetime.datetime.today()


def main(event: func.EventHubEvent):
    logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
    logging.info(f'  SequenceNumber = {event.sequence_number}')
    logging.info(f'  Offset = {event.offset}')

    blob_client =  BlobClient.from_connection_string(
        storage_connection_string,container_name,
    str(today.year) +"/" + str(today.month) + "/" + str(today.day) + ".txt")

blob_client.upload_blob(event.get_body().decode(),blob_type="AppendBlob")

我使用下面的代码将事件发送到事件中心:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="<conn string>", eventhub_name="<hub name>")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

我的local.settings.json

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "<storage account conn str>",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "testhubname0123_test_EVENTHUB": "<event hub conn str>"
    }
}

function.json就像这个文档指出的那样:

{
    "scriptFile": "__init__.py",
    "bindings": [{
        "type": "eventHubTrigger",
        "name": "event",
        "direction": "in",
        "eventHubName": "test01(this is my hubname, pls palce yours here)&quot;,
        "connection": "testhubname0123_test_EVENTHUB"
    }]
}

结果

使用上面的代码运行该函数并将数据发送到事件中心: 在此处输入图像描述

数据已成功保存到存储中:

在此处输入图像描述

下载.txt并查看其内容我们可以看到已经写入了3个事件内容: 在此处输入图像描述


推荐阅读