首页 > 解决方案 > 读取 parquet 文件后创建一个简单的 DF

问题描述

我是 Scala 的新开发人员,在 Spark Scala 上编写简单代码时遇到了一些问题。在阅读镶木地板文件后,我得到了这个 DF:

ID   Timestamp
1    0
1    10
1    11    
2    20
3    15

我想要的是从第一个 DF 创建一个 DF 结果(例如,如果 ID = 2,则时间戳应乘以 2)。所以,我创建了一个新类:

 case class OutputData(id: bigint, timestamp:bigint)

这是我的代码:

val tmp = spark.read.parquet("/user/test.parquet").select("id", "timestamp")


  val outputData:OutputData = tmp.map(x:Row => {

  var time_result


  if (x.getString("id") == 2) {
     time_result = x.getInt(2)* 2 
  }

  if (x.getString("id") == 1) {
     time_result = x.getInt(2) + 10
  }


  OutputData2(x.id, time_result)

})

case class OutputData2(id: bigint, timestamp:bigint)

你能帮我吗 ?

标签: scalaapache-spark

解决方案


为了使实现更容易,您可以df使用 a 进行转换case class,该过程使用对象表示法而不是每次您想要某个元素的值时都Dataset访问您的。row除此之外,根据您的输入和输出将采用相同的格式,您可以使用相同的案例类而不是定义 2。

代码如下:

// Sample intput data
val df = Seq(
  (1,    0L),
  (1,    10L),
  (1,    11L),   
  (2,    20L),
  (3,    15L)
).toDF("ID", "Timestamp")
df.show()

// Case class as helper
case class OutputData(ID: Integer, Timestamp: Long)

val newDF = df.as[OutputData].map(record=>{
  val newTime = if(record.ID == 2) record.Timestamp*2 else record.Timestamp // identify your id and apply logic based on that
  OutputData(record.ID, newTime)// return same format with updated values
})

newDF.show()

上述代码的输出:

// original
+---+---------+
| ID|Timestamp|
+---+---------+
|  1|        0|
|  1|       10|
|  1|       11|
|  2|       20|
|  3|       15|
+---+---------+
// new one
+---+---------+
| ID|Timestamp|
+---+---------+
|  1|        0|
|  1|       10|
|  1|       11|
|  2|       40|
|  3|       15|
+---+---------+

推荐阅读