python - 创建 Azure 函数以执行 ETL
问题描述
我对Azure Functions 还很陌生,所以我很难理解要采取的步骤。
我用 Python 编写了一个代码,它从 MongoDB 数据库中获取特定的 JSON 文件,将其展平并将其作为 CSV 文件导出到 Azure Data Lake Storage。
我做了一些研究,发现我可以创建一个 Azure 函数并使用 Blob 存储触发器,它可以检查 JSON 文件是否已上传到test directory
并自动执行我的 Python 脚本以将其展平并将其导出为 CSV 文件.
但是,我现在如何修改我的 Python 脚本,使其能够导入已上传到的 JSON 文件,test directory
而不是连接到 MongoDB 数据库?
from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta
mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas
document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False)
#print(mongo_docs)
try:
global service_client
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", "xxxx"), credential='xxxxx')
file_system_client = service_client.get_file_system_client(file_system="root")
directory_client = file_system_client.get_directory_client("testdirectory")
file_client = directory_client.create_file("areas.csv")
local_file = open(r"C:\Users\areas.csv",'rb')
file_contents = local_file.read()
file_client.upload_data(file_contents, overwrite=True)
except Exception as e:
print(e)
为此类问题创建事件网格触发器或 Blob 存储触发器更好吗?
任何帮助或建议将不胜感激。
解决方案
但是,我现在如何修改我的 Python 脚本以使其导入已上传到测试目录而不是连接到 MongoDB 数据库的 JSON 文件?
如果使用 blob 触发器,首先需要将数据传输到存储帐户,然后您可以在函数内部处理数据:
import logging
import azure.functions as func
def main(myblob: func.InputStream):
#just put the python script here.
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
请注意不要使用blob输出绑定,需要手动编写处理逻辑,否则会破坏基于数据湖的对象,无法再使用数据湖包接收。
为此类问题创建事件网格触发器或 Blob 存储触发器更好吗?
任何帮助或建议将不胜感激。
如果你的需求是单一的(即只需要在从数据湖传入数据时执行),那么你可以使用blob触发器。事件网格的优点是端点可以由许多不同的事件触发。
推荐阅读
- javascript - 如何在 Angular 中实现表单投影?
- postgresql - PostgreSQL 无法识别数据库
- python - 使用 pdoc 记录来自不同模块的函数
- python - 如何使用 pyQt 和 appScheduler 更改视图?
- algorithm - Alpha-beta 修剪与领带导致可避免的损失
- oracle - DBA_TAB_MODIFICATIONS 是如何工作的
- javascript - 基于视口中元素的可见性和持续时间的操作
- django - 无论我做什么,都会不断收到错误“没有这样的表”
- ruby - 如何在类(ruby)的方法中显示名称?
- c++ - 对此有编译器警告吗?