首页 > 解决方案 > 使用 python 数据块转换 azure 数据工厂中的数据

问题描述

我的任务是将数百万个单个 JSON 文件转换并合并为 BIG CSV 文件。

使用复制活动和映射模式的操作将非常简单,我已经测试过,问题是大量文件的 JSON 格式不正确。

我知道错误是什么,修复也非常简单,我想我可以使用 Python Data 砖块活动来修复字符串,然后将输出传递给一个复制活动,该活动可以将记录合并到一个大的 CSV 文件中。

我有这样的想法,我不确定这是否是解决此任务的正确方法。我不知道在 Data Brick 活动中使用 Copy Activy 的输出 在此处输入图像描述

标签: pythonazureazure-data-factory-2

解决方案


听起来您想使用 Azure 数据工厂转换大量单个 JSON 文件,但正如 @KamilNowinski 所说,它现在不支持 Azure。但是,现在您使用的是 Azure Databricks,编写一个简单的 Python 脚本来执行相同的操作对您来说更容易。因此,一种有效的解决方案是直接使用 Azure 存储 SDK 和pandasPython 包,通过 Azure Databricks 上的几个步骤来完成。

  1. 也许这些 JSON 文件都在 Azure Blob Storage 的容器中,因此您需要将它们列出在容器中,list_blob_names并使用 pandas 函数的 sas 令牌生成它们的 url read_json,代码如下。

    from azure.storage.blob.baseblobservice import BaseBlobService
    from azure.storage.blob import ContainerPermissions
    from datetime import datetime, timedelta
    
    account_name = '<your account name>'
    account_key = '<your account key>'
    container_name = '<your container name>'
    
    service = BaseBlobService(account_name=account_name, account_key=account_key)
    token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),)
    
    blob_names = service.list_blob_names(container_name)
    blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names)
    
    #print(list(blob_urls_with_token))
    
  2. 然后,您可以通过函数直接从 blob 中读取这些 JSON 文件,read_json以创建它们的 pandas Dataframe。

    import pandas as pd
    
    for blob_url_with_token in blob_urls_with_token:
        df = pd.read_json(blob_url_with_token)
    

    即使您想将它们合并到一个大的 CSV 文件中,您也可以先通过Combining / joining / merginglike中列出的 pandas 函数将它们合并到一个大的 Dataframe 中append

  3. 要将数据框写入 csv 文件,我认为to_csv功能非常简单。或者,您可以在 Azure Databricks 上将 pandas 数据帧转换为 PySpark 数据帧,如下面的代码所示。

    from pyspark.sql import SQLContext
    from pyspark import SparkContext
    
    sc = SparkContext()
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(df)
    

所以接下来,无论你想做什么,都很简单。如果你想在 Azure Databricks 中将脚本调度为 notebook,可以参考官方文档Jobs运行 Spark 作业。

希望能帮助到你。


推荐阅读