首页 > 解决方案 > 使用 .net-spark 加载具有多个部分的固定位置文件

问题描述

我正在尝试使用.net-spark在spark中加载具有多个部分的固定位置文件。以下是该文件的示例:

01Nikola Tesla                  tesla@gmail.com                                                       +5521981181569
02Creations                                       
03Alternating current                              
03Tesla coil                                 
01Thomas Edison                 edison@gmail.com                                                      +5521981181569
02Creations                                        
03Lamp                                         
03Phonograph                                      
03General Eletric                                 
03Cinema 

所以基本上我们有一个交易所有者的标题,一个子标题说明下面是交易,最后是交易部分。交易行不包含对所有者的任何引用,所以是的,这很棘手。

正如@EdElliott 所建议的,这是我们应该如何在 RDD 中查看数据(仅显示第一行,但目的是读取所有内容):

发明者 电子邮件 电话 创作
尼古拉·特斯拉 tesla@gmail.com +5511999999999 交流电

我想这不是现在很常见的文件格式,但在巴西的大银行中仍然很常见。

找到了 java 的这个例子,但它不处理部分。我相信我可以使用 UDF 来实现这一点,但同样,我不知道从哪里开始。欣赏这里的任何见解。

谢谢

标签: apache-spark.net-spark

解决方案


我通过了另一边。对解决方案并不感到自豪,主要是因为我使用了 ToLocalIterator() 以及我将参数传递给 normalizedTransaction UDF 的方式。谢谢@EdElliott,你的博客对我帮助很大。

无论如何,这里是:

static Func<Column, Column> OnlyHeaders = Udf<string, bool>(
        line =>  line.Substring(0,2).Equals("01")
    );

    static Func<Column, Column> OnlyTransactions = Udf<string, bool>(
        line =>  line.Substring(0,2).Equals("03")
    );

    static Func<Column, Column> breakHeader =
        Udf<string, string[]>((line) => GetHeader(line) );

    static Func<Column, Column> breakTransaction =
        Udf<string, string[]>((line) => GetTransaction(line) );

    static void Main(string[] args)
    {
        // Create a Spark session
        var spark = SparkSession
            .Builder()
            .AppName("FixedLenghtWithSectionsApp")
            .GetOrCreate();

        // Create initial DataFrame
        var rawDf = spark.Read().Schema("rawLine STRING").Text("resources/input.txt");
        rawDf = rawDf.WithColumn("rowNumber", Functions.MonotonicallyIncreasingId());
        rawDf.CreateOrReplaceTempView ("rawdata");

        var headersDf = GetHeadersDf(rawDf);
        headersDf.CreateOrReplaceTempView("headers");
        headersDf.Show();

        var transactionsDf = GetTransactionsDf(rawDf);
        transactionsDf.CreateOrReplaceTempView("transactions");
        transactionsDf.Show();
        var headerLines = headersDf.ToLocalIterator().ToList().Select( r => r.Get("rowNumber"));
        var columns = new StringBuilder();
        foreach(var h in headerLines)
            columns.Append($"{h.ToString()},");
        
        
        var column = Functions.Lit(columns.ToString()).Alias("ids");

        Func<Column, Column, Column> normalizedTransaction = Udf<string, string,int>( (line, hLines) => {

                var ids = hLines.Split(",", StringSplitOptions.RemoveEmptyEntries).Select(s => int.Parse(s));
                var id = ids.Where(h => h < int.Parse(line) );
                if (id.Any())
                    return id.Max();
                return -1;
                
            } );

        
        var inventionsDf = transactionsDf.Alias("one")
           .Select(
               Functions.Col("one.rowNumber"), 
               Functions.Col("invention"), 
               normalizedTransaction(Functions.Col("one.rowNumber").Cast("string") , column ).Alias("id")
            );

        inventionsDf = inventionsDf.Alias("one")
            .Join(
                headersDf.Alias("two")
                ,Functions.Col("one.id") == Functions.Col("two.rowNumber")
                ,"inner"
            );

        inventionsDf.Show();
        
        spark.Stop();
    }



    private static DataFrame GetTransactionsDf(DataFrame rawDf)
    {
        var transactionsRawsDf = rawDf
            .Select(rawDf["rowNumber"],breakTransaction(rawDf["rawLine"]).Alias("value"))
            .Where(OnlyTransactions(rawDf["rawLine"]));

        return transactionsRawsDf.Select(
            transactionsRawsDf["rowNumber"], 
            transactionsRawsDf.Col("value").GetItem(0))
            .ToDF("rowNumber", "invention");
        
    }

    private static DataFrame GetHeadersDf(DataFrame rawDf)
    {
        var headerRawsDf = rawDf
            .Select(rawDf["rowNumber"],breakHeader(rawDf["rawLine"]).Alias("value"))
            .Where(OnlyHeaders(rawDf["rawLine"]));
        
        
        return headerRawsDf.Select(
            headerRawsDf["rowNumber"], 
            headerRawsDf.Col("value").GetItem(0), 
            headerRawsDf.Col("value").GetItem(1),
            headerRawsDf.Col("value").GetItem(2))
            .ToDF("rowNumber", "inventor", "email", "phone");
    }

    private static string[] GetHeader(string line)
    {

        var columns = new List<string>();
            columns.Add(line.Substring(2,30));
            columns.Add(line.Substring(32,70));
            columns.Add(line.Substring(102,14));
            return columns.ToArray();
    }

     private static string[] GetTransaction(string line)
    {
        var columns = new List<string>();
            columns.Add(line.Substring(2,48));
            return columns.ToArray();
    }

推荐阅读