scala - 无法在 Spark Scala 中将 Seq[String] 传递给 .parquet
问题描述
我正在尝试使用该.parquet
方法在 Scala 中的 Spark API 中的一次调用中读取多个路径。
我有一个方法,它接收 aSeq[String]
但在方法调用中包含时似乎无法识别它并尝试检索 aString
而不是 a Seq[String]
。
def readPaths(sparkSession: SparkSession, basePath: String, inputPaths: Seq[String]): Dataset[Row] = {
sparkSession.read
.option("basepath", basePath)
.parquet(inputPaths) // Doesn't accept 'inputPaths'
}
在评论部分,它只是抱怨Seq[String]
不是一个String
类型对象,同时它确实接受了一个普通的"", "", "", ""
。
解决方案
这:
def parquet(paths: String*): DataFrame
方法需要一个可变参数,而不是明确的 Seq。因此,在 Scala 中,您必须将其传递为:
def readPaths(sparkSession: SparkSession, basePath: String, inputPaths: Seq[String]): Dataset[Row] = {
sparkSession.read
.option("basepath", basePath)
.parquet(inputPaths:_*)
}
请注意 val 末尾的“:_ *”。
在 spark2-shell 上验证(使用 Spark 2.3.0.cloudera3):
scala> case class MyProduct(key: Int, value: String, lastSeen: java.sql.Timestamp)
defined class MyProduct
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val baseDS = spark.createDataset(0 until 1000).map(i => MyProduct(i, s"valueOf:$i", new java.sql.Timestamp(System.currentTimeMillis())))
baseDS: org.apache.spark.sql.Dataset[MyProduct] = [key: int, value: string ... 1 more field]
scala> baseDS.withColumn("state", lit("IT"))
res10: org.apache.spark.sql.DataFrame = [key: int, value: string ... 2 more fields]
scala> res10.write.mode("overwrite").partitionBy("state").parquet("/tmp/test/parquet/")
scala> baseDS.withColumn("state", lit("US"))
res12: org.apache.spark.sql.DataFrame = [key: int, value: string ... 2 more fields]
scala> res12.write.mode("append").partitionBy("state").parquet("/tmp/test/parquet/")
scala> val inputPaths = Seq("/tmp/test/parquet/state=IT", "/tmp/test/parquet/state=US")
inputPaths: Seq[String] = List(/tmp/test/parquet/state=IT, /tmp/test/parquet/state=US)
scala> val data = spark.read.parquet(inputPaths)
<console>:38: error: overloaded method value parquet with alternatives:
(paths: String*)org.apache.spark.sql.DataFrame <and>
(path: String)org.apache.spark.sql.DataFrame
cannot be applied to (Seq[String])
val data = spark.read.parquet(inputPaths)
^
scala> val data = spark.read.parquet(inputPaths:_*)
data: org.apache.spark.sql.DataFrame = [key: int, value: string ... 1 more field]
scala> data.show(10)
+---+-----------+--------------------+
|key| value| lastSeen|
+---+-----------+--------------------+
|500|valueOf:500|2019-02-04 17:05:...|
|501|valueOf:501|2019-02-04 17:05:...|
|502|valueOf:502|2019-02-04 17:05:...|
|503|valueOf:503|2019-02-04 17:05:...|
|504|valueOf:504|2019-02-04 17:05:...|
|505|valueOf:505|2019-02-04 17:05:...|
|506|valueOf:506|2019-02-04 17:05:...|
|507|valueOf:507|2019-02-04 17:05:...|
|508|valueOf:508|2019-02-04 17:05:...|
|509|valueOf:509|2019-02-04 17:05:...|
+---+-----------+--------------------+
only showing top 10 rows
scala>
推荐阅读
- node.js - 代理在本地机器上工作,但在 RHELL 服务器上不工作
- oracle - 在 PL/SQL 中使用批量插入
- c++ - 用于 Visual Studio 代码的 Arduino 多文件代码
- powershell - 将带有撇号的文件名的批处理文件传递给powershell命令
- angular - Angular 10 Formly + 材质垂直多选框
- laravel - 附加多态关系?
- javascript - jQuery / Javascript总和按预期计算
- javascript - 调用堆栈超出快速排序功能
- r - 通过使用 dplyr 添加前缀重命名除 id 列之外的所有列
- python - 在 Python 这样的编程语言和 Tensorflow 这样的平台中,“API”是什么意思?