首页 > 解决方案 > 如何将文件列中的值列表解析为Spark sql Dataframe

问题描述

我仍然是 spark scala 的新手,我需要从 hive 中的每个表中提取第一个分区。我已经在一个单独的文本文件中提取了表列表并创建为一个序列,我不知道如何将每个序列值解析为“显示分区 test_hive_database”。

scala> import scala.io.Source
import scala.io.Source

scala> val filename = "text_tables.txt"
filename: String = text_tables.txt

Sample file containing the table list:
TABLE_NAME_A101
TABLE_NAME_A102
TABLE_NAME_A103
TABLE_NAME_B101
TABLE_NAME_C101

scala> val linestable = 
scala.io.Source.fromFile("text_tables.txt").getLines.toSeq
linestable: Seq[String] = Stream(TABLE_NAME_A101, ?)

下面是表中的第一个分区示例,我已将表与分区值连接起来。

scala> sql("show partitions test_hive_database.TABLE_NAME_A101").withColumn("new_column",concat(lit("TABLE_NAME_A101,"),'partition)).select("new_column").show(1,false)

+------------------------------------+
|new_column                          |
+------------------------------------+
|TABLE_NAME_A101,dta_ld_dt=2018-01-23|
+------------------------------------+
only showing top 1 row

尝试理解

scala> for(e <- linestable) yield (sql("show partitions test_hive_database.$e").withColumn("new_column",concat(lit("$e , "),'partition)).select("new_column").show(1,false))

org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '$' expecting {'SELECT', 'FROM', 'ADD'

预期结果

+------------------------------------+
|new_column                          |
+------------------------------------+
|TABLE_NAME_A101,dta_ld_dt=2018-01-23|
|TABLE_NAME_A102,dta_ld_dt=2018-02-28|
|TABLE_NAME_A103,dta_ld_dt=2018-03-31|
|TABLE_NAME_B101,dta_ld_dt=2018-04-30|
|TABLE_NAME_C101,dta_ld_dt=2019-01-30|
+------------------------------------+

实际结果:

我收到错误,我不确定这种方法是否正确。

如何将文件中的单列值解析为 spark sql(表名)并将结果附加到 csv 文件中?

标签: scalaapache-sparkapache-spark-sql

解决方案


推荐阅读