scala - 读取 parquet 文件后创建一个简单的 DF
问题描述
我是 Scala 的新开发人员,在 Spark Scala 上编写简单代码时遇到了一些问题。在阅读镶木地板文件后,我得到了这个 DF:
ID Timestamp
1 0
1 10
1 11
2 20
3 15
我想要的是从第一个 DF 创建一个 DF 结果(例如,如果 ID = 2,则时间戳应乘以 2)。所以,我创建了一个新类:
case class OutputData(id: bigint, timestamp:bigint)
这是我的代码:
val tmp = spark.read.parquet("/user/test.parquet").select("id", "timestamp")
val outputData:OutputData = tmp.map(x:Row => {
var time_result
if (x.getString("id") == 2) {
time_result = x.getInt(2)* 2
}
if (x.getString("id") == 1) {
time_result = x.getInt(2) + 10
}
OutputData2(x.id, time_result)
})
case class OutputData2(id: bigint, timestamp:bigint)
你能帮我吗 ?
解决方案
为了使实现更容易,您可以df
使用 a 进行转换case class
,该过程使用对象表示法而不是每次您想要某个元素的值时都Dataset
访问您的。row
除此之外,根据您的输入和输出将采用相同的格式,您可以使用相同的案例类而不是定义 2。
代码如下:
// Sample intput data
val df = Seq(
(1, 0L),
(1, 10L),
(1, 11L),
(2, 20L),
(3, 15L)
).toDF("ID", "Timestamp")
df.show()
// Case class as helper
case class OutputData(ID: Integer, Timestamp: Long)
val newDF = df.as[OutputData].map(record=>{
val newTime = if(record.ID == 2) record.Timestamp*2 else record.Timestamp // identify your id and apply logic based on that
OutputData(record.ID, newTime)// return same format with updated values
})
newDF.show()
上述代码的输出:
// original
+---+---------+
| ID|Timestamp|
+---+---------+
| 1| 0|
| 1| 10|
| 1| 11|
| 2| 20|
| 3| 15|
+---+---------+
// new one
+---+---------+
| ID|Timestamp|
+---+---------+
| 1| 0|
| 1| 10|
| 1| 11|
| 2| 40|
| 3| 15|
+---+---------+
推荐阅读
- android - LiveData 观察者调用了两次
- intellij-idea - 有时不提供在 IntelliJ Idea 2021.2.3(终极版)中运行单一测试
- s3-kafka-connector - kafka s3 接收器连接器键和标头 s3 存储写入不起作用
- git - 如何在 GitHub 上进行两次更改进入两个单独的 PR?
- amazon-web-services - Appsync + Elasticsearch:执行对 OpenSearch 的请求时出现通信错误
- haskell - NFData 应该有一个对偶吗?
- azure-logic-apps - 使用 Azure 逻辑应用操作更改 JSON 属性名称并删除属性
- javascript - 无法返回嵌套调用的 Outlook 函数
- sql - RDBMS 中“OneOf”关系的最佳模式
- django - Django如何在使用信号从另一个模型输入后更新一些特定的字段数据