首页 > 解决方案 > 将非结构化博客解析为结构化格式

问题描述

我有一个文件,其中包含类似的记录,

输入(原始日志记录):

50.57.1​​90.149 - - [22/Apr/2012:07:12:41 +0530] "GET /computers/laptops.html?brand=819 HTTP/1.0" 200 12530 "-" "-"

输出(处理后的日志记录):

50.57.1​​90.149 - - 22/Apr/2012:07:12:41 +0530 GET /computers/laptops.html?brand=819 HTTP/1.0 计算机 - - laptops.html 品牌=819 200 12530 - -

输入数据格式:

  1. 远程IP
  2. 远程日志名称
  3. 用户
  4. 时间
  5. 请求字符串
  6. 状态码
  7. 字节串
  8. 用户代理
  9. 转介

这是我的代码:

object unStructuredToStructured {

  def main(args : Array[String]){ 

  val spark = SparkSession.builder().appName("unStructuredToStructured").master("local[*]").getOrCreate()
 import spark.implicits._

 val rdd1 = spark.read.textFile("C:\\Users\\LENOVO\\Downloads\\Veeresh_study\\DataSet_from_OldSessions\\weblogs\\weblogs_1_rec.txt").rdd

 val schemaString = "remote_IP remote_log_name user time request_string status_code byte_string user_agent referral"

 val fields = schemaString.split(" ")
   .map(fieldName => StructField(fieldName, StringType, nullable = true))

 val schema = StructType(fields)


 val rowRDD = rdd1.map(x => x.split(" "))
   .map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4), attributes(5), attributes(6), attributes(7), attributes(8)))

 val data = spark.createDataFrame(rowRDD, schema)
 data.show()

}
}

输出:

这是我得到的输出 这是我得到的输出

从图片可以看出,

我们使用空格作为分隔符,一个字段的值被分成多列(因为字段值中包含空格

例如:理想情况下,“ time ”列的值应该是“ [22/Apr/2012:07:12:41 +0530] ”,但在这里它被分成两列,即“ time ”和“ request_string

类似地,request_string的值(“GET /computers/laptops.html?brand=819 HTTP/1.0”)被分成“ status_code ”、“ byte_string ”和“user_agent

请通过忽略字段值中的空格来帮助解析字段值

标签: scalaapache-sparkapache-spark-sql

解决方案


经过多次尝试找到了解决方案,肯定可以改进以下解决方案。

object unStructuredToStructured {
  def main(args : Array[String]){

    val spark = SparkSession.builder().appName("unStructuredToStructured").master("local[*]").getOrCreate()
   import spark.implicits._

   val rdd1 = spark.read.textFile("C:\\Users\\LENOVO\\Downloads\\Veeresh_study\\DataSet_from_OldSessions\\weblogs\\weblogs_10_lakh_rec.txt").rdd

    val schemaString = "remote_IP remote_log_name user time request_string status_code byte_string user_agent referral"

    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))

    val schema = StructType(fields)


    val rowRDD = rdd1.map(x => x.split(" "))
    .map(attributes => 
      Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4), attributes(5), attributes(6), attributes(7), attributes(8))
      )
   // rowRDD.foreach(println)

    def combiner(arr : Array[String])  = {

     val len = arr.length
     if (len <= 15)
     {

       var val0  = arr(0)
     var val1  = arr(1)
     var val2 = arr(2)
     var val3  = arr(3).concat(arr(4))
     var val4  = arr(5).concat(arr(6)).concat(arr(7))
     var val5  = arr(8)
     var val6 = arr(9)
       var last = arr.last
       var value : String = null


    for(i <- 10 until len-1) {
     if(value == null)
           value = arr(i)
      else           
      value = value.concat(arr(i))

    }

       Row(val0, val1, val2,val3,val4,val5,val6,value,last)
     }
     else 
     {

     var val0  = arr(0)
     var val1  = arr(1)
     var val2 = arr(2)
     var val3  = arr(3).concat(arr(4))
     var val4  = arr(5).concat(arr(6)).concat(arr(7))
     var val5  = arr(8)
     var val6 = arr(9)
     var val7  = arr(10).concat(arr(11)).concat(arr(12)).concat(arr(13))
     .concat(arr(14)).concat(arr(15)).concat(arr(16)).concat(arr(17)).concat(arr(17)).concat(arr(18)).concat(arr(19))
     var val8  = arr(20)
     var last = arr.last

    //val len1 = arr.length
    var value : String = null
    for(i <- 10 until len-1) {
     if(value == null)
           value = arr(i)
      else           
      value = value.concat(arr(i))

    }
  // Row(len,val0, val1, val2,val3,val4,val5,val6,val7,val8)
 Row(val0, val1, val2,val3,val4,val5,val6,value,last)
     }
      }

     val rowRDD1 = rdd1.map(x => x.split(" "))
    .map{attributes => 
     combiner(attributes)
     }
      rowRDD1.foreach(println)

    val data = spark.createDataFrame(rowRDD1, schema)
   data.show()
  }
}

这是 o/p 的屏幕截图: 这是输出


推荐阅读