首页 > 解决方案 > Azure 数据工厂数据流 CSV 架构漂移到 parquet 静态目标删除列。可能吗?

问题描述

尝试编写一个处理两个类似版本化 CSV 文件的 Azure 数据工厂数据流。版本 1 文件有 48 列。版本 2 文件有 50 列 - 与版本 1 相同的 48 列,但在末尾附加了 2 列。我想创建一个包含所有 50 列的目标 parquet 文件,以通过 polybase 加载到我的 SQLDW 中。从历史上看,我们在同一个 blob 源中有超过 6000 个文件,没有简单的方法来识别 48 列和 50 列的文件。以下是我最接近解决方案的方法。

  1. 启用了允许架构漂移的源 CSV。未在 CSV 数据集上定义架构
  2. MapDrifted 派生列 – 即 toString(byName('Manufacturer')) 所有 50 列
  3. Sink – 数据集是 parquet,其模式由 parquet 模板文件定义,其中包含所有 50 列。Sink 分区由 sourcefilename 设置。每个传入的文件都会在输出中生成一个 parquet 文件。

此解决方案适用于一组两个测试文件。一个有 48 列,一个有 50 列。创建了两个包含 50 列的 parquet 文件。一个文件填充到第 48 列,另一个文件填充所有 50 列。如果我在测试中添加更多包含 48 列的源文件。有 50 列的文件丢失了最后两列数据,最后只有 48 列?我认为这将是 ADF 可以解决的常见问题。即文件版本随时间变化。有什么建议么?下面是我的 ADF 的脚本

source(allowSchemaDrift: true,
    validateSchema: false,
    rowUrlColumn: 'sourcefilename',
    inferDriftedColumnTypes: true,
    multiLineRow: true,
    wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
        SKU = toString(byName('SKU')),
        {Partner Name} = toString(byName('Partner Name')),
        {Partner Part Number} = toString(byName('Partner Part Number')),
        {Search Date} = toString(byName('Search Date')),
        {Search Result Description} = toString(byName('Search Result Description')),
        {1st Line Description} = toString(byName('1st Line Description')),
        {2nd Line Description} = toString(byName('2nd Line Description')),
        {Product Category} = toString(byName('Product Category')),
        {Product Category 1} = toString(byName('Product Category 1')),
        {Product Category 2} = toString(byName('Product Category 2')),
        {Product Category 3} = toString(byName('Product Category 3')),
        {Product Category 4} = toString(byName('Product Category 4')),
        {UNSPSC Code} = toString(byName('UNSPSC Code')),
        Pricing = toString(byName('Pricing')),
        Currency = toString(byName('Currency')),
        {Availability Qty} = toString(byName('Availability Qty')),
        {Availability Status} = toString(byName('Availability Status')),
        {Average Rating} = toString(byName('Average Rating')),
        {Total Reviews} = toString(byName('Total Reviews')),
        Brand = toString(byName('Brand')),
        Model = toString(byName('Model')),
        {Product Line} = toString(byName('Product Line')),
        {Partner Site} = toString(byName('Partner Site')),
        {Product URL} = toString(byName('Product URL')),
        Warranty = toString(byName('Warranty')),
        {Product Length} = toString(byName('Product Length')),
        {Product Width} = toString(byName('Product Width')),
        {Product Height} = toString(byName('Product Height')),
        {Product Depth} = toString(byName('Product Depth')),
        {Product Weight} = toString(byName('Product Weight')),
        {Fullfilling Partner} = toString(byName('Fullfilling Partner')),
        {Date First Available} = toString(byName('Date First Available')),
        {Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
        {Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
        {Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
        {Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
        {Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
        {Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
        {Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
        {Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
        {From the Manufacturer} = toString(byName('From the Manufacturer')),
        {Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
        {Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
        {Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
        {Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
        Endpoint = toString(byName('Endpoint')),
        {Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
        {Search SKU} = toString(byName('Search SKU')),
        {Search Manufacturer} = toString(byName('Search Manufacturer')),
        sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
        FileName as string,
        Manufacturer as string,
        SKU as string,
        PartnerName as string,
        PartnerPartNumber as string,
        SearchDate as string,
        SearchResultDescription as string,
        {1stLineDescription} as string,
        {2ndLineDescription} as string,
        ProductCategory as string,
        ProductCategory1 as string,
        ProductCategory2 as string,
        ProductCategory3 as string,
        ProductCategory4 as string,
        UNSPSCCode as string,
        Pricing as string,
        Currency as string,
        AvailabilityQty as string,
        AvailabilityStatus as string,
        AverageRating as string,
        TotalReviews as string,
        Brand as string,
        Model as string,
        ProductLine as string,
        PartnerSite as string,
        ProductURL as string,
        Warranty as string,
        ProductLength as string,
        ProductWidth as string,
        ProductHeight as string,
        ProductDepth as string,
        ProductWeight as string,
        FullfillingPartner as string,
        DateFirstAvailable as string,
        FrequentlyBoughtTogether1 as string,
        FrequentlyBoughtTogether1PartNumber as string,
        FrequentlyBoughtTogether2 as string,
        FrequentlyBoughtTogether2PartNumber as string,
        FrequentlyBoughtTogether3 as string,
        FrequentlyBoughtTogether3PartNumber as string,
        FrequentlyBoughtTogether4 as string,
        FrequentlyBoughtTogether4PartNumber as string,
        FromtheManufacturer as string,
        BestesellersRank1 as string,
        BestsellersRank2 as string,
        BestsellersRank3 as string,
        BestsellersRank4 as string,
        Endpoint as string,
        RelatedStarTechcomSKU as string,
        SearchSKU as string,
        SearchManufacturer as string
    ),
    allowSchemaDrift: false,
    validateSchema: false,
    format: 'parquet',
    rowUrlColumn:'sourcefilename',
    mapColumn(
        FileName = sourcefilename,
        Manufacturer,
        SKU,
        PartnerName = {Partner Name},
        PartnerPartNumber = {Partner Part Number},
        SearchDate = {Search Date},
        SearchResultDescription = {Search Result Description},
        {1stLineDescription} = {1st Line Description},
        {2ndLineDescription} = {2nd Line Description},
        ProductCategory = {Product Category},
        ProductCategory1 = {Product Category 1},
        ProductCategory2 = {Product Category 2},
        ProductCategory3 = {Product Category 3},
        ProductCategory4 = {Product Category 4},
        UNSPSCCode = {UNSPSC Code},
        Pricing,
        Currency,
        AvailabilityQty = {Availability Qty},
        AvailabilityStatus = {Availability Status},
        AverageRating = {Average Rating},
        TotalReviews = {Total Reviews},
        Brand,
        Model,
        ProductLine = {Product Line},
        PartnerSite = {Partner Site},
        ProductURL = {Product URL},
        Warranty,
        ProductLength = {Product Length},
        ProductWidth = {Product Width},
        ProductHeight = {Product Height},
        ProductDepth = {Product Depth},
        ProductWeight = {Product Weight},
        FullfillingPartner = {Fullfilling Partner},
        DateFirstAvailable = {Date First Available},
        FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
        FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
        FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
        FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
        FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
        FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
        FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
        FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
        FromtheManufacturer = {From the Manufacturer},
        BestesellersRank1 = {Bestesellers Rank 1},
        BestsellersRank2 = {Bestsellers Rank 2},
        BestsellersRank3 = {Bestsellers Rank 3},
        BestsellersRank4 = {Bestsellers Rank 4},
        Endpoint,
        RelatedStarTechcomSKU = {Related StarTech.com SKU},
        SearchSKU = {Search SKU},
        SearchManufacturer = {Search Manufacturer}
    )) ~> sink1

标签: azure-data-factoryazure-data-flow

解决方案


您是否总是希望输出具有相同架构的 Parquet 文件?即 50 列,无论传入文件的架构如何?

如果是这样,您可以做的是使用定义此 50 列结构的“规范模型”创建数据流。

您将使用派生列构建目标架构定义,并将传入的源列映射到那里。如果您没有匹配的列,您可以设置为 NULL。

使用此方法,您无需在接收器中定义数据集格式。您可以将 Auto Map 与空白数据集一起使用并输出 Parquet 文件。

Parquet 文件输出模式将匹配您的 Derived Column 模型,该模型将定义您在上面的 Sink 映射中使用的友好别名。

这是我为帮助解释此方法而制作的视频:https ://www.youtube.com/watch?v=K5tgzLjEE9Q 。

我希望它有所帮助。


推荐阅读