首页 > 解决方案 > 将模式应用于文本文件的一部分

问题描述

我有以下格式的文本文件:

1 | 2 | 3 | 4 
6 | 7 | 8 | 9 | 0
a1 | b1 | c1 | d1 | f1 | g1 | i1 
a2 | b2 | c2 | d2 | f2 | g2 | i2  
a3 | b3 | c3 | d3 | f3 | g3 | i3 
a4 | b4 | c4 | d4 | f4 | g4 | i4  
a5 | b5 | c5 | d5 | f5 | g5 | i5 

我在 Scala 中将这个文件读为:

val df = ss.read
      .format("csv")
      .option("delimiter","|"))
      .option("header", "false")
      .load("hdfsDir/myfile.txt")

现在我想对第一条记录、第二条记录和我的文本文件的其余部分应用三种不同的方案,这意味着:

 1 | 2 | 3 | 4 <<== first schema 
 6 | 7 | 8 | 9 | 0  <<== second schema 

第三个模式应该应用于文件的其余部分

 a1 | b1 | c1 | d1 | f1 | g1 | i1 
 a2 | b2 | c2 | d2 | f2 | g2 | i2 
 a3 | b3 | c3 | d3 | f3 | g3 | i3  
 a4 | b4 | c4 | d4 | f4 | g4 | i4  
 a5 | b5 | c5 | d5 | f5 | g5 | i5 

为此,我创建了三种不同的方案:

val firstSchema=StructType(Array(
                StructField("ones",StringType,nullable=true),
                StructField("twos",StringType,nullable=true),
                StructField("threes",StringType,nullable=true),
                StructField("fours",StringType,nullable=true)
                ));

val secondSchema=StructType(Array(                                        
                    StructField("sixes",StringType,nullable=true),
                    StructField("sevens",StringType,nullable=true),
                    StructField("eights",StringType,nullable=true),
                    StructField("nines",StringType,nullable=true),
                    StructField("tens",StringType,nullable=true)
                    StructField("zeros",StringType,nullable=true)
                    ));

val restSchema=StructType(Array(
                    StructField("firstfield",StringType,nullable=true),
                    StructField("secondfield",StringType,nullable=true),
                    StructField("thirdfield",StringType,nullable=true),
                    StructField("fourthfield",StringType,nullable=true),
                    StructField("fifthfield",StringType,nullable=true),
                    StructField("sixthfield",StringType,nullable=true),
                    StructField("seventhfield",StringType,nullable=true)
                    ));

现在我想将前面的三个方案应用到文本文件的三个部分(第一条记录,第二条记录,其余记录)

我尝试使用以下代码将第一个模式应用于第一条记录:

val firstdf = spark.createDataFrame(
  df.head,
  firstSchema
)

val seconddf = spark.createDataFrame(
      df.take(2).drop(1),
      secondSchema
    )

val restdf = spark.createDataFrame(
      df,
      restSchema
    )

但它不起作用?

有什么建议么?

提前致谢

标签: scaladataframeapache-sparkschema

解决方案


我发现解决此问题的最简单方法是将一个通用架构应用于整个文本文件,然后从架构中取出第一行和第二行:

val restSchema=StructType(Array(
                    StructField("firstfield",StringType,nullable=true),
                    StructField("secondfield",StringType,nullable=true),
                    StructField("thirdfield",StringType,nullable=true),
                    StructField("fourthfield",StringType,nullable=true),
                    StructField("fifthfield",StringType,nullable=true),
                    StructField("sixthfield",StringType,nullable=true),
                    StructField("seventhfield",StringType,nullable=true)
                    ));
val df = ss.read
      .format("csv")
      .option("delimiter","|"))
      .option("header", "false")
      .schema(restSchema)
      .load("hdfsDir/myfile.txt")


 val firstdf= df.filter("firstfield= '1'").select
 val seconddf= df.filter("firstfield= '6'").select

df
.withColumn("rnk",row_number().over(Window.orderBy($"firstfield")))
.where($"rnk">2).drop($"rnk")

希望这能有所帮助。


推荐阅读