首页 > 解决方案 > 当 foldLeft 应用于 dataFrame 时如何理解输出数据?

问题描述

我正在尝试使用 Scala 中的 foldLeft 和 regex_replace 从 Dataframe 的字符串列中删除换行符。数据框是在读取 RDBMS 表后创建的:postgres 上的 public.test_sid。该表有 4 列:id, id1, id2, id3并且其中id3有一个换行符。这就是我编写逻辑的方式:

var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val conFile       = "testconnection.properties"
val properties    = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName   = properties.getProperty("devUserName")
val devPassword   = properties.getProperty("devPassword")
val driverClass   = properties.getProperty("gpDriverClass")
try {
  Class.forName(driverClass).newInstance()
} catch {
  case cnf: ClassNotFoundException =>
    System.exit(1)
  case e: Exception =>
    System.exit(1)
}
def main(args: Array[String]): Unit = {
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
  import spark.implicits._
  val spColsDF = spark.read.format("jdbc").option("url",connectionUrl).option("dbtable", "(select * from public.test_sid) as sampleTab").option("user", devUserName).option("password", devPassword).load()
  val strCols = spColsDF.schema.fields.filter(_.dataType==StringType).map(_.name)
  val finalDF = strCols.foldLeft(spColsDF){ (tempdf, colName) => tempdf.withColumn(colName, regexp_replace(col(colName), "\\n", " ")) }
  println("----------------------------------------------------------------------------------")
  spColsDF.show()
  println("----------------------------------------------------------------------------------")
  finalDF.show()
  println("----------------------------------------------------------------------------------")
}

在输出日志 From dataFrame:spColsDF中,我看到了插入格式的数据。

+--------------------+--------------------+----+---+
|                  id|                 id1| id2|id3|
+--------------------+--------------------+----+---+
|1.000000000000000000|1.000000000000000000|   a|
a|
+--------------------+--------------------+----+---+

但是其中的数据以finalDF一种奇怪的格式出现。最后一列的值:id3 位于数据帧的开头和第一列的第一位:id 被截断并显示为“000000000000000000”而不是“1.000000000000000000”。

+--------------------+--------------------+----+---+
|                  id|                 id1| id2|id3|
+--------------------+--------------------+----+---+
 a|000000000000000000|1.000000000000000000|   a|
+--------------------+--------------------+----+---+

列数据:id3先到

如果我只是从 finalDF 访问该特定列:id3,我会得到如下数据:

scala> finalDF.select(finalDF("id3")).show
+---+
|id3|
+---+
 a|
+---+

scala> finalDF.select(finalDF("id")).show
+--------------------+
|                  id|
+--------------------+
|1.000000000000000000|
+--------------------+

这只是打印数据时的控制台问题还是代码中存在缺陷?如果我上面写的代码有任何问题,谁能告诉我。

标签: scalaapache-spark

解决方案


看起来像一个 CRLF(又名 \r\n)问题。

从历史上看,一个用于告诉控制台在行首返回,另一个用于创建新行。

在这里,您似乎抑制了新行的创建,但仍有“回到开头”部分。

所以我建议你同时 替换\r\n

参见关于 CR 的维基百科

回车,有时称为墨盒回车,通常缩写为 CR 或回车,是用于将设备位置重置为文本行开头的控制字符或机制。它与换行和换行概念密切相关,尽管它可以单独考虑。

要查看它的实际效果,让我们创建一个测试数据框:

scala> val dataframe = Seq(
     ("normal", "normal"), 
     ("withLF", "normal\n"), 
     ("withCRLF", "normal\r\n")).toDF("id", "value")
dataframe: org.apache.spark.sql.DataFrame = [id: string, value: string]

scala> dataframe.show
+--------+--------+
|      id|   value|
+--------+--------+
|  normal|  normal|
|  withLF| normal
|
|withCRLF|normal
|
+--------+--------+

在这里,我们看到带有“\r\n”和“\n”的字符串存在您在帖子开头观察到的问题。现在,如果我使用你替换功能:

dataframe.withColumn("value", regexp_replace($"value", "\n", "")).show
+--------+-------+
|      id|  value|
+--------+-------+
|  normal| normal|
|  withLF| normal|
|withCRLF|normal
+--------+-------+

我们看到我们已经解决了“\n”的情况,但没有解决“\r\n”。因此,如果您真的想使用正则表达式进行搜索/替换,您应该声明 CR 和 LF 都被替换:

scala> dataframe.withColumn("value", regexp_replace($"value", "[\r\n]+", " ")).show
+--------+-------+
|      id|  value|
+--------+-------+
|  normal| normal|
|  withLF|normal |
|withCRLF|normal |
+--------+-------+

(可能的增强:不使用正则表达式进行单个字符替换。不要在输出字符串的末尾输出空白字符,...)。


推荐阅读