首页 > 解决方案 > 创建 Azure 函数以执行 ETL

问题描述

我对Azure Functions 还很陌生,所以我很难理解要采取的步骤。

我用 Python 编写了一个代码,它从 MongoDB 数据库中获取特定的 JSON 文件,将其展平并将其作为 CSV 文件导出到 Azure Data Lake Storage。

我做了一些研究,发现我可以创建一个 Azure 函数并使用 Blob 存储触发器,它可以检查 JSON 文件是否已上传到test directory并自动执行我的 Python 脚本以将其展平并将其导出为 CSV 文件.

但是,我现在如何修改我的 Python 脚本,使其能够导入已上传到的 JSON 文件,test directory而不是连接到 MongoDB 数据库?

from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta

mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas

document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False) 

#print(mongo_docs)
try:  
    global service_client
        
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
        "https", "xxxx"), credential='xxxxx')
    

    file_system_client = service_client.get_file_system_client(file_system="root")

    directory_client = file_system_client.get_directory_client("testdirectory")

    file_client = directory_client.create_file("areas.csv")
    local_file = open(r"C:\Users\areas.csv",'rb')

    file_contents = local_file.read()

    file_client.upload_data(file_contents, overwrite=True)

except Exception as e:
    print(e) 

为此类问题创建事件网格触发器或 Blob 存储触发器更好吗?

任何帮助或建议将不胜感激。

标签: pythonazure-functionsetlazure-blob-storage

解决方案


但是,我现在如何修改我的 Python 脚本以使其导入已上传到测试目录而不是连接到 MongoDB 数据库的 JSON 文件?

如果使用 blob 触发器,首先需要将数据传输到存储帐户,然后您可以在函数内部处理数据:

import logging

import azure.functions as func


def main(myblob: func.InputStream):
    #just put the python script here.
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes")

请注意不要使用blob输出绑定,需要手动编写处理逻辑,否则会破坏基于数据湖的对象,无法再使用数据湖包接收。

为此类问题创建事件网格触发器或 Blob 存储触发器更好吗?

任何帮助或建议将不胜感激。

如果你的需求是单一的(即只需要在从数据湖传入数据时执行),那么你可以使用blob触发器。事件网格的优点是端点可以由许多不同的事件触发。


推荐阅读