python-3.x - 使用 Python 接收和写入 Azure blob 存储的 Azure 组合器函数
问题描述
我想使用 Python 创建一个 Azure 函数,该函数将从 Azure 事件中心读取数据。幸运的是,Visual Studio Code 提供了一种创建 Azure 函数骨架的方法。可以根据需要进行编辑。我可以在 Microsoft 文档的帮助下创建一个演示 HTTP 触发器 Azure 函数,但我不知道我应该在下面的函数中进行什么更改,以便它可以从事件中心读取数据并将其写入 Azure Blob 存储。此外,如果有人可以推荐任何博客以获取有关 azure 功能和标准实践的更多详细信息。
更新:
我尝试根据@Stanley 的建议更新我的代码,但可能需要更新代码。我在 Azure 函数中编写了以下代码。
local.settings.json
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "Storage account connection string",
"FUNCTIONS_WORKER_RUNTIME": "python",
"EventHub_ReceiverConnectionString": "Endpoint Connection String of the EventHubNamespace",
"Blob_StorageConnectionString": "Storage account connection string"
}
}
函数.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "function",
"type": "eventHubTrigger",
"direction": "in",
"name": "event",
"eventHubName": "pwo-events",
"connection": "EventHub_ReceiverConnectionString",
"cardinality": "many",
"consumerGroup": "$Default",
"dataType": "binary"
}
]
}
初始化.py
import logging
import azure.functions as func
from azure.storage.blob import BlobClient
storage_connection_string='Storage account connection string'
container_name = ''
def main(event: func.EventHubEvent):
logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
logging.info(f' SequenceNumber = {event.sequence_number}')
logging.info(f' Offset = {event.offset}')
blob_client = BlobClient.from_connection_string(storage_connection_string,container_name,str(event.sequence_number) + ".txt")
blob_client.upload_blob(event.get_body().decode())
在执行了上面的代码之后,一些东西被写入了 blob 容器。但不是txt文件,而是以其他格式保存。此外,如果我多次触发 azure 函数,则文件将被覆盖。我想执行追加操作而不是覆盖。另外,我想将我的文件保存在用户定义的位置。示例:容器/年=/月=/日期= 谢谢!!
解决方案
如果要从 Azure 事件中心读取数据,使用事件中心触发器会容易得多,这是我的测试代码(读取数据并写入存储):
import logging
import azure.functions as func
from azure.storage.blob import BlobClient
import datetime
storage_connection_string=''
container_name = ''
today = datetime.datetime.today()
def main(event: func.EventHubEvent):
logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
logging.info(f' SequenceNumber = {event.sequence_number}')
logging.info(f' Offset = {event.offset}')
blob_client = BlobClient.from_connection_string(
storage_connection_string,container_name,
str(today.year) +"/" + str(today.month) + "/" + str(today.day) + ".txt")
blob_client.upload_blob(event.get_body().decode(),blob_type="AppendBlob")
我使用下面的代码将事件发送到事件中心:
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str="<conn string>", eventhub_name="<hub name>")
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData('First event '))
event_data_batch.add(EventData('Second event'))
event_data_batch.add(EventData('Third event'))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
我的local.settings.json
:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "<storage account conn str>",
"FUNCTIONS_WORKER_RUNTIME": "python",
"testhubname0123_test_EVENTHUB": "<event hub conn str>"
}
}
我function.json
就像这个文档指出的那样:
{
"scriptFile": "__init__.py",
"bindings": [{
"type": "eventHubTrigger",
"name": "event",
"direction": "in",
"eventHubName": "test01(this is my hubname, pls palce yours here)",
"connection": "testhubname0123_test_EVENTHUB"
}]
}
结果
数据已成功保存到存储中:
推荐阅读
- r - 打包闪亮的应用程序时在哪里存储 ccs 文件
- python - Python中的Zip结构试图仅读取文件夹
- node.js - 将单词列表转换为频率 json
- http - 服务器是否可以返回代码为 204 的正文?
- python - RegEx 匹配温度 (°c)
- python - 使用 ajax 将数据传递给 django 使用 post 方法
- datetime - VBscript 计算特定文件夹中过去 5 分钟内创建的文件数
- directory - 有没有办法以编程方式创建文件夹?
- laravel - 当我的路由是 POST 时,我可以对 GET 请求做些什么?
- c# - 无法删除在 EntityFramework Core 中拥有实体的实体