首页 > 解决方案 > Scala +如何从文件中的Spark Dataframe Column中进行占位符替换?

问题描述

MyPlaceHolder.json

[[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"]]

基本上,我需要阅读这个文件并用占位符替换 DF 列中的模式。

例如:任何这样的模式"(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)" shold get replace with " PHPHONENUMBER "

我 python 我做了如下这样的事情。

replacement_patterns = get_config_object__(os.getcwd() + REPLACEMENT_PATTERN_FILE_PATH)


def placeholder_replacement(text, replacement_patterns):
    """
     This function replace the place holder with reference to replacement_patterns.

     Parameters
     ----------
     text : String
         Input string to the function.

     replacement_patterns : json
         json object of placeholder replacement_patterns pattern.

     Returns
     -------
     text : String
         Output string with replacement of placeholder.
     """

    for replacement, pattern in replacement_patterns:
        text = re.compile(pattern, re.IGNORECASE | re.UNICODE).sub(replacement, text)
    return text

def get_config_object__(config_file_path):
    """
     This function mainly load the configuration object in json form.

     Parameters
     ----------
     config_file_path : str
         Configuration path.

     Returns
     -------
     config_object : JSON object
         Configuration object.
     """

    config_file = open(config_file_path)
    config_object = json.load(config_file)
    config_file.close()
    return config_object

如何在数据框列中替换这种文件替换?

Note:: I can not change file, its cross used a placeholder.json.(I know it's not json but can't help it)

Its inside resource folder.

这是我正在尝试的东西,但它只是实验。请随时提出一些开箱即用的建议。没有任何效果,我尝试了不同的方法,但由于我是这门语言的新手,我需要帮助。

    val inputPath = getClass.getResource("/input_data/placeholder_replacement.txt").getPath

    val inputDF = spark.read.option("delimiter", "|").option("header", true).option("ignoreLeadingWhiteSpace", true).option("ignoreTrailingWhiteSpace", true).csv(inputPath)


    val replacement_pattern = getClass.getResource("/unitmetrics-replacement-patterns.json").getPath

    val replacement_pattern_DF = (spark.read.text(replacement_pattern))


    val myval = replacement_pattern_DF.rdd.map(row => row.getString(0).split("],").toList).collect()


    val removeNonGermanLetterFunction = udf((col: String) => {



      myval.foreach { x =>

        x.foreach { x =>

          var key = x.split("\",")(0).replaceAll("[^0-9a-zA-ZäöüßÄÖÜẞ _]", "")
          var value = x.split("\",")(1).replaceAll("\"", "")

          val regex = value.r

          regex.replaceAllIn(col, key)


        }
      }
    }
    )


    val input = inputDF.withColumn("new", removeNonGermanLetterFunction(col("duplicate_word_col")))

    input.show()

标签: scalaapache-sparkdataframe

解决方案


您应该尽可能使用 Spark DataFrame(也称为 Spark SQL)API,而不是您展示的较低级别的 RDD API(rdd.map()... rdd.foreach())。

这通常意味着将数据加载到 DataFrame 中df,然后df.withColumn()用于创建新列,并将转换应用于以前的列。最后仍然使用 RDD,但是通过使用高级 DataFrame API 为您优化了很多东西。

这是一个小 Scala 应用程序,展示了如何使用 Spark SQL 函数将模式替换应用于 DataFrame regexp_replace

import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Column

object Main {

  def main(args: Array[String]): Unit = {

    // Set logging level to avoid Spark log spam
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // Build Spark SQL session (mine is version 2.3.2)
    val spark = SparkSession.builder
      .appName("scalaTest1")
      .master("local[*]")
      .getOrCreate()

    // Import required to use Spark SQL methods like toDF() and calling columns with '
    import spark.implicits._

    // Create some basic DataFrame
    val df1 = List(
      (1, "I got pattern1 and pattern2."),
      (2, "I don't have any."),
      (3, "Oh, that pattern1 I have too.")
    ).toDF("id", "sentence")

    df1.show(false)
    //+---+-----------------------------+
    //|id |sentence                     |
    //+---+-----------------------------+
    //|1  |I got pattern1 and pattern2. |
    //|2  |I don't have any.            |
    //|3  |Oh, that pattern1 I have too.|
    //+---+-----------------------------+

    // Create replacements map
    val replacements = Map(
      "pattern1" -> "replacement1",
      "pattern2" -> "replacement2",
      "I " -> "you "
    )

    // Import required to use functions on DataFrame columns such as regexp_replace()
    import org.apache.spark.sql.functions._

    // Create a new column with one of the replacements applied to "sentence" column
    val df2 = df1.withColumn(
      "new",
      regexp_replace('sentence, "pattern1", replacements("pattern1"))
    )

    df2.show(false)
    //+---+-----------------------------+---------------------------------+
    //|id |sentence                     |new                              |
    //+---+-----------------------------+---------------------------------+
    //|1  |I got pattern1 and pattern2. |I got replacement1 and pattern2. |
    //|2  |I don't have any.            |I don't have any.                |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.|
    //+---+-----------------------------+---------------------------------+

    // With the first two replacements applied to "sentence" column by nesting one inside the other
    val df3 = df1.withColumn(
      "new",
      regexp_replace(
        regexp_replace('sentence, "pattern2", replacements("pattern2")),
        "pattern1",
        replacements("pattern1")
      )
    )

    df3.show(false)
    //+---+-----------------------------+------------------------------------+
    //|id |sentence                     |new                                 |
    //+---+-----------------------------+------------------------------------+
    //|1  |I got pattern1 and pattern2. |I got replacement1 and replacement2.|
    //|2  |I don't have any.            |I don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.   |
    //+---+-----------------------------+------------------------------------+

    // Same, but applying all replacements recursively with "foldLeft" instead of nesting every replacement
    val df4 = df1.withColumn(
      "new",
      replacements.foldLeft(df1("sentence")) {
        case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
      }
    )
    df4.show(false)
    //+---+-----------------------------+--------------------------------------+
    //|id |sentence                     |new                                   |
    //+---+-----------------------------+--------------------------------------+
    //|1  |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
    //|2  |I don't have any.            |you don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 you have too.   |
    //+---+-----------------------------+--------------------------------------+

    // Select the columns you want to keep and rename if necessary
    val df5 = df4.select('id, 'new).withColumnRenamed("new", "sentence")
    df5.show(false)
    //+---+--------------------------------------+
    //|id |sentence                              |
    //+---+--------------------------------------+
    //|1  |you got replacement1 and replacement2.|
    //|2  |you don't have any.                   |
    //|3  |Oh, that replacement1 you have too.   |
    //+---+--------------------------------------+

  }

}

在 Scala 中有多种库可以从 JSON 中读取,这里我将使用Spark SQL 方法 spark.read.json(path)来不添加另一个依赖项,即使使用 Spark 读取这么小的文件可能被认为是矫枉过正。

请注意,我使用的函数需要每行一个有效 JSON 对象的特定文件格式,并且您应该能够将 JSON 的字段映射到数据帧的列。

这是replacements.json我创建的文件的内容:

{"pattern":"pattern1" , "replacement": "replacement1"}
{"pattern":"pattern2" , "replacement": "replacement2"}
{"pattern":"I " , "replacement": "you "}

这是重写的小应用程序,用于从该文件中读取替换,将它们放入地图中,然后使用我在前一个末尾显示的 foldLeft 方法将它们应用于数据。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, SparkSession}

object Main2 {

  def main(args: Array[String]): Unit = {

    // Set logging level to avoid Spark log spam
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // Build Spark SQL session (mine is version 2.3.2)
    val spark = SparkSession.builder
      .appName("scalaTest1")
      .master("local[*]")
      .getOrCreate()

    // Import required to use Spark SQL methods like toDF() and calling columns with '
    import spark.implicits._
    // Import required to use functions on DataFrame columns such as regexp_replace()
    import org.apache.spark.sql.functions._


    // Create some basic DataFrame
    val df1 = List(
      (1, "I got pattern1 and pattern2."),
      (2, "I don't have any."),
      (3, "Oh, that pattern1 I have too.")
    ).toDF("id", "sentence")
    df1.show(false)
    //+---+-----------------------------+
    //|id |sentence                     |
    //+---+-----------------------------+
    //|1  |I got pattern1 and pattern2. |
    //|2  |I don't have any.            |
    //|3  |Oh, that pattern1 I have too.|
    //+---+-----------------------------+

    // Read replacements json file into a DataFrame
    val replacements_path = "/path/to/your/replacements.json"
    val replacements_df = spark.read.json(replacements_path)
    replacements_df.show(false)
    //+--------+------------+
    //|pattern |replacement |
    //+--------+------------+
    //|pattern1|replacement1|
    //|pattern2|replacement2|
    //|I       |you         |
    //+--------+------------+

    // Turn DataFrame into a Map for ease of use in next step
    val replacements_map = replacements_df
      .collect() // Brings all the df data from all Spark executors to the Spark driver, use only if df is small!
      .map(row => (row.getAs[String]("pattern"), row.getAs[String]("replacement")))
      .toMap
    print(replacements_map)
    // Map(pattern1 -> replacement1, pattern2 -> replacement2, I  -> you )

    // Apply replacements recursively with "foldLeft"
    val df2 = df1.withColumn(
      "new",
      replacements_map.foldLeft(df1("sentence")) {
        case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
      }
    )
    df2.show(false)
    //+---+-----------------------------+--------------------------------------+
    //|id |sentence                     |new                                   |
    //+---+-----------------------------+--------------------------------------+
    //|1  |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
    //|2  |I don't have any.            |you don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 you have too.   |
    //+---+-----------------------------+--------------------------------------+

    // Select the columns you want to keep and rename if necessary
    val df3 = df2.select('id, 'new).withColumnRenamed("new", "sentence")
    df3.show(false)
    //+---+--------------------------------------+
    //|id |sentence                              |
    //+---+--------------------------------------+
    //|1  |you got replacement1 and replacement2.|
    //|2  |you don't have any.                   |
    //|3  |Oh, that replacement1 you have too.   |
    //+---+--------------------------------------+

  }

}

在您的最终应用程序中,删除df.show()and print()。Spark“转换”是“懒惰的”。这意味着 Spark 只会将您要求它执行的操作堆叠到执行图 (DAG) 中而不执行。只有当你强迫他采取行动时,例如当你在某处使用df.show()df.save()写入数据时(这些被称为“行动”),它才会分析 DAG,优化它,并实际对数据执行转换。这就是为什么您应该避免使用诸如df.show()中间转换之类的操作。


推荐阅读