首页 > 解决方案 > spark - 在不使用案例类的情况下为 scala 模拟 pyspark asDict()

问题描述

Pyspark 允许您在使用以下方法从数据帧返回单行时创建字典。

t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).collect()[0].asDict()
print(t)
print(t["key"])
print(t["value"])
print(t["rw"])
print("Printing using for comprehension")
[print(t[i]) for i in t ]

Results:

{'key': 'spark.app.id', 'value': 'local-1594577194330', 'rw': 1}
spark.app.id
local-1594577194330
1
Printing using for comprehension
spark.app.id
local-1594577194330
1

我正在scala-spark中尝试相同的方法。可以使用案例类方法。

case class download(key:String, value:String,rw:Long)

val t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).as[download].first
println(t)
println(t.key)
println(t.value)
println(t.rw)

结果:

download(spark.app.id,local-1594580739413,1)
spark.app.id
local-1594580739413
1

在实际问题中,我有近 200 多列,不想使用案例类方法。我正在尝试类似下面的方法来避免使用案例类选项。

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))

(df.columns).zip(df.take(1)(0))

但得到错误。

    <console>:28: error: type mismatch;
 found   : (String, String, Long)
 required: Iterator[?]
       (df.columns.toIterator).zip(df.take(1)(0))

有没有办法解决这个问题。

标签: scalaapache-sparkpyspark

解决方案


问题在于,这zip是一个集合上的方法,它只能采用另一个实现IterableOnce的集合对象,并且df.take(1)(0)是一个 Spark SQL Row,它不属于该类别。

尝试Seq使用它的toSeq方法将该行转换为 a 。

df.columns.zip(df.take(1)(0).toSeq)

结果:

Array((key,spark.app.id), (value,local-1594577194330), (rw,1))

推荐阅读