scala - Scala +如何从文件中的Spark Dataframe Column中进行占位符替换?
问题描述
MyPlaceHolder.json
[[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],
[" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],
[" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],
[" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],
[" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"]]
基本上,我需要阅读这个文件并用占位符替换 DF 列中的模式。
例如:任何这样的模式"(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)" shold get replace with " PHPHONENUMBER "
我 python 我做了如下这样的事情。
replacement_patterns = get_config_object__(os.getcwd() + REPLACEMENT_PATTERN_FILE_PATH)
def placeholder_replacement(text, replacement_patterns):
"""
This function replace the place holder with reference to replacement_patterns.
Parameters
----------
text : String
Input string to the function.
replacement_patterns : json
json object of placeholder replacement_patterns pattern.
Returns
-------
text : String
Output string with replacement of placeholder.
"""
for replacement, pattern in replacement_patterns:
text = re.compile(pattern, re.IGNORECASE | re.UNICODE).sub(replacement, text)
return text
def get_config_object__(config_file_path):
"""
This function mainly load the configuration object in json form.
Parameters
----------
config_file_path : str
Configuration path.
Returns
-------
config_object : JSON object
Configuration object.
"""
config_file = open(config_file_path)
config_object = json.load(config_file)
config_file.close()
return config_object
如何在数据框列中替换这种文件替换?
Note:: I can not change file, its cross used a placeholder.json.(I know it's not json but can't help it)
Its inside resource folder.
这是我正在尝试的东西,但它只是实验。请随时提出一些开箱即用的建议。没有任何效果,我尝试了不同的方法,但由于我是这门语言的新手,我需要帮助。
val inputPath = getClass.getResource("/input_data/placeholder_replacement.txt").getPath
val inputDF = spark.read.option("delimiter", "|").option("header", true).option("ignoreLeadingWhiteSpace", true).option("ignoreTrailingWhiteSpace", true).csv(inputPath)
val replacement_pattern = getClass.getResource("/unitmetrics-replacement-patterns.json").getPath
val replacement_pattern_DF = (spark.read.text(replacement_pattern))
val myval = replacement_pattern_DF.rdd.map(row => row.getString(0).split("],").toList).collect()
val removeNonGermanLetterFunction = udf((col: String) => {
myval.foreach { x =>
x.foreach { x =>
var key = x.split("\",")(0).replaceAll("[^0-9a-zA-ZäöüßÄÖÜẞ _]", "")
var value = x.split("\",")(1).replaceAll("\"", "")
val regex = value.r
regex.replaceAllIn(col, key)
}
}
}
)
val input = inputDF.withColumn("new", removeNonGermanLetterFunction(col("duplicate_word_col")))
input.show()
解决方案
您应该尽可能使用 Spark DataFrame(也称为 Spark SQL)API,而不是您展示的较低级别的 RDD API(rdd.map()
... rdd.foreach()
)。
这通常意味着将数据加载到 DataFrame 中df
,然后df.withColumn()
用于创建新列,并将转换应用于以前的列。最后仍然使用 RDD,但是通过使用高级 DataFrame API 为您优化了很多东西。
这是一个小 Scala 应用程序,展示了如何使用 Spark SQL 函数将模式替换应用于 DataFrame regexp_replace
。
import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Column
object Main {
def main(args: Array[String]): Unit = {
// Set logging level to avoid Spark log spam
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Build Spark SQL session (mine is version 2.3.2)
val spark = SparkSession.builder
.appName("scalaTest1")
.master("local[*]")
.getOrCreate()
// Import required to use Spark SQL methods like toDF() and calling columns with '
import spark.implicits._
// Create some basic DataFrame
val df1 = List(
(1, "I got pattern1 and pattern2."),
(2, "I don't have any."),
(3, "Oh, that pattern1 I have too.")
).toDF("id", "sentence")
df1.show(false)
//+---+-----------------------------+
//|id |sentence |
//+---+-----------------------------+
//|1 |I got pattern1 and pattern2. |
//|2 |I don't have any. |
//|3 |Oh, that pattern1 I have too.|
//+---+-----------------------------+
// Create replacements map
val replacements = Map(
"pattern1" -> "replacement1",
"pattern2" -> "replacement2",
"I " -> "you "
)
// Import required to use functions on DataFrame columns such as regexp_replace()
import org.apache.spark.sql.functions._
// Create a new column with one of the replacements applied to "sentence" column
val df2 = df1.withColumn(
"new",
regexp_replace('sentence, "pattern1", replacements("pattern1"))
)
df2.show(false)
//+---+-----------------------------+---------------------------------+
//|id |sentence |new |
//+---+-----------------------------+---------------------------------+
//|1 |I got pattern1 and pattern2. |I got replacement1 and pattern2. |
//|2 |I don't have any. |I don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.|
//+---+-----------------------------+---------------------------------+
// With the first two replacements applied to "sentence" column by nesting one inside the other
val df3 = df1.withColumn(
"new",
regexp_replace(
regexp_replace('sentence, "pattern2", replacements("pattern2")),
"pattern1",
replacements("pattern1")
)
)
df3.show(false)
//+---+-----------------------------+------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+------------------------------------+
//|1 |I got pattern1 and pattern2. |I got replacement1 and replacement2.|
//|2 |I don't have any. |I don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 I have too. |
//+---+-----------------------------+------------------------------------+
// Same, but applying all replacements recursively with "foldLeft" instead of nesting every replacement
val df4 = df1.withColumn(
"new",
replacements.foldLeft(df1("sentence")) {
case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
}
)
df4.show(false)
//+---+-----------------------------+--------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+--------------------------------------+
//|1 |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
//|2 |I don't have any. |you don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 you have too. |
//+---+-----------------------------+--------------------------------------+
// Select the columns you want to keep and rename if necessary
val df5 = df4.select('id, 'new).withColumnRenamed("new", "sentence")
df5.show(false)
//+---+--------------------------------------+
//|id |sentence |
//+---+--------------------------------------+
//|1 |you got replacement1 and replacement2.|
//|2 |you don't have any. |
//|3 |Oh, that replacement1 you have too. |
//+---+--------------------------------------+
}
}
在 Scala 中有多种库可以从 JSON 中读取,这里我将使用Spark SQL 方法 spark.read.json(path)
来不添加另一个依赖项,即使使用 Spark 读取这么小的文件可能被认为是矫枉过正。
请注意,我使用的函数需要每行一个有效 JSON 对象的特定文件格式,并且您应该能够将 JSON 的字段映射到数据帧的列。
这是replacements.json
我创建的文件的内容:
{"pattern":"pattern1" , "replacement": "replacement1"}
{"pattern":"pattern2" , "replacement": "replacement2"}
{"pattern":"I " , "replacement": "you "}
这是重写的小应用程序,用于从该文件中读取替换,将它们放入地图中,然后使用我在前一个末尾显示的 foldLeft 方法将它们应用于数据。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, SparkSession}
object Main2 {
def main(args: Array[String]): Unit = {
// Set logging level to avoid Spark log spam
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Build Spark SQL session (mine is version 2.3.2)
val spark = SparkSession.builder
.appName("scalaTest1")
.master("local[*]")
.getOrCreate()
// Import required to use Spark SQL methods like toDF() and calling columns with '
import spark.implicits._
// Import required to use functions on DataFrame columns such as regexp_replace()
import org.apache.spark.sql.functions._
// Create some basic DataFrame
val df1 = List(
(1, "I got pattern1 and pattern2."),
(2, "I don't have any."),
(3, "Oh, that pattern1 I have too.")
).toDF("id", "sentence")
df1.show(false)
//+---+-----------------------------+
//|id |sentence |
//+---+-----------------------------+
//|1 |I got pattern1 and pattern2. |
//|2 |I don't have any. |
//|3 |Oh, that pattern1 I have too.|
//+---+-----------------------------+
// Read replacements json file into a DataFrame
val replacements_path = "/path/to/your/replacements.json"
val replacements_df = spark.read.json(replacements_path)
replacements_df.show(false)
//+--------+------------+
//|pattern |replacement |
//+--------+------------+
//|pattern1|replacement1|
//|pattern2|replacement2|
//|I |you |
//+--------+------------+
// Turn DataFrame into a Map for ease of use in next step
val replacements_map = replacements_df
.collect() // Brings all the df data from all Spark executors to the Spark driver, use only if df is small!
.map(row => (row.getAs[String]("pattern"), row.getAs[String]("replacement")))
.toMap
print(replacements_map)
// Map(pattern1 -> replacement1, pattern2 -> replacement2, I -> you )
// Apply replacements recursively with "foldLeft"
val df2 = df1.withColumn(
"new",
replacements_map.foldLeft(df1("sentence")) {
case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
}
)
df2.show(false)
//+---+-----------------------------+--------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+--------------------------------------+
//|1 |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
//|2 |I don't have any. |you don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 you have too. |
//+---+-----------------------------+--------------------------------------+
// Select the columns you want to keep and rename if necessary
val df3 = df2.select('id, 'new).withColumnRenamed("new", "sentence")
df3.show(false)
//+---+--------------------------------------+
//|id |sentence |
//+---+--------------------------------------+
//|1 |you got replacement1 and replacement2.|
//|2 |you don't have any. |
//|3 |Oh, that replacement1 you have too. |
//+---+--------------------------------------+
}
}
在您的最终应用程序中,删除df.show()
and print()
。Spark“转换”是“懒惰的”。这意味着 Spark 只会将您要求它执行的操作堆叠到执行图 (DAG) 中而不执行。只有当你强迫他采取行动时,例如当你在某处使用df.show()
或df.save()
写入数据时(这些被称为“行动”),它才会分析 DAG,优化它,并实际对数据执行转换。这就是为什么您应该避免使用诸如df.show()
中间转换之类的操作。
推荐阅读
- java - 如何在java中运行程序10000次并找到平均值?
- c++ - create_directory("/MyPath/.MyFolder") 没有创建目录;预计会创建一个隐藏目录
- amazon-web-services - 如何允许桌面应用程序和 Web 应用程序与 RDS SQL Server 实例连接?
- html - AJAX Post 请求答案未到达但显示在网络中
- amazon-web-services - 创建 EMR 集群时出错,EMR 服务角色无效
- python - 如何使它不打印超过 x 出现次数
- mysql - 插入转储 MySQL MariaDB
- java - 了解java代码中最小生成树的一部分
- javascript - 我可以使用 google v8 在 C++ 回调函数中获取 JavaScript 函数的源文本吗?
- python - 在 Dask 中,有没有办法在依赖项可用时处理它们,例如 multiprocessing.imap_unordered?