python - 使用 python 数据块转换 azure 数据工厂中的数据
问题描述
我的任务是将数百万个单个 JSON 文件转换并合并为 BIG CSV 文件。
使用复制活动和映射模式的操作将非常简单,我已经测试过,问题是大量文件的 JSON 格式不正确。
我知道错误是什么,修复也非常简单,我想我可以使用 Python Data 砖块活动来修复字符串,然后将输出传递给一个复制活动,该活动可以将记录合并到一个大的 CSV 文件中。
我有这样的想法,我不确定这是否是解决此任务的正确方法。我不知道在 Data Brick 活动中使用 Copy Activy 的输出
解决方案
听起来您想使用 Azure 数据工厂转换大量单个 JSON 文件,但正如 @KamilNowinski 所说,它现在不支持 Azure。但是,现在您使用的是 Azure Databricks,编写一个简单的 Python 脚本来执行相同的操作对您来说更容易。因此,一种有效的解决方案是直接使用 Azure 存储 SDK 和pandas
Python 包,通过 Azure Databricks 上的几个步骤来完成。
也许这些 JSON 文件都在 Azure Blob Storage 的容器中,因此您需要将它们列出在容器中,
list_blob_names
并使用 pandas 函数的 sas 令牌生成它们的 urlread_json
,代码如下。from azure.storage.blob.baseblobservice import BaseBlobService from azure.storage.blob import ContainerPermissions from datetime import datetime, timedelta account_name = '<your account name>' account_key = '<your account key>' container_name = '<your container name>' service = BaseBlobService(account_name=account_name, account_key=account_key) token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),) blob_names = service.list_blob_names(container_name) blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names) #print(list(blob_urls_with_token))
然后,您可以通过函数直接从 blob 中读取这些 JSON 文件,
read_json
以创建它们的 pandas Dataframe。import pandas as pd for blob_url_with_token in blob_urls_with_token: df = pd.read_json(blob_url_with_token)
即使您想将它们合并到一个大的 CSV 文件中,您也可以先通过
Combining / joining / merging
like中列出的 pandas 函数将它们合并到一个大的 Dataframe 中append
。要将数据框写入 csv 文件,我认为
to_csv
功能非常简单。或者,您可以在 Azure Databricks 上将 pandas 数据帧转换为 PySpark 数据帧,如下面的代码所示。from pyspark.sql import SQLContext from pyspark import SparkContext sc = SparkContext() sqlContest = SQLContext(sc) spark_df = sqlContest.createDataFrame(df)
所以接下来,无论你想做什么,都很简单。如果你想在 Azure Databricks 中将脚本调度为 notebook,可以参考官方文档Jobs
运行 Spark 作业。
希望能帮助到你。
推荐阅读
- javascript - 有没有办法将数组元素添加到 formData 对象,所以 net core [FromForm] 会正确反序列化它们?
- python - 在 PRAW 中,有没有办法为多个用户获取评论流?
- sql - 优化调用代价高昂函数的嵌套选择的最佳方法
- c# - 流利的验证器。为所有模型添加 _validateService
- python - 如何根据列值(或值列表)将熊猫数据框子集为多个数据框
- node.js - Websocket(ws)nodejs处理错误“ENOTFOUND”
- python - 如果其中一个任务失败,我如何中止在多个主机上并行运行的一组结构任务?
- java - java.util.NoSuchElementExeption 关于 Scanner 类?
- android - 如何在 Espresso 中授予许可?
- firebase - 安全问题:使用公共 URL 访问 Firebase 存储文件