scala - 基于最大日期记录的 Spark 过滤器
问题描述
我正在使用 Spark/Scala 处理Hive
包含每个成员的事务数据的表。我需要获取每个成员的最大记录。我使用下面的代码完成了这项任务,它成功运行,但没有得到性能。
我需要问是否有任何其他方法可以增强此代码的性能?我找到了一些使用 spark-sql 的方法,但我更喜欢Spark
Dataframe 或 Dataset。
下面的示例将重现我的代码和我的数据。
val mamberData = Seq(
Row("1234", "CX", java.sql.Timestamp.valueOf("2018-09-09 00:00:00")),
Row("1234", "CX", java.sql.Timestamp.valueOf("2018-03-02 00:00:00")),
Row("5678", "NY", java.sql.Timestamp.valueOf("2019-01-01 00:00:00")),
Row("5678", "NY", java.sql.Timestamp.valueOf("2018-01-01 00:00:00")),
Row("7088", "SF", java.sql.Timestamp.valueOf("2018-09-01 00:00:00"))
)
val MemberDataSchema = List(
StructField("member_id", StringType, nullable = true),
StructField("member_state", StringType, nullable = true),
StructField("activation_date", TimestampType, nullable = true)
)
import spark.implicits._
val memberDF =spark.createDataFrame(
spark.sparkContext.parallelize(mamberData),
StructType(MemberDataSchema)
)
val memberDfMaxDate = memberDF.groupBy('member_id).agg(max('activation_date).as("activation_date"))
val memberDFMaxOnly = memberDF.join(memberDfMaxDate,Seq("member_id","activation_date"))
输出如下
+---------+------------+-------------------+
|member_id|member_state|activation_date |
+---------+------------+-------------------+
|1234 |CX |2018-09-09 00:00:00|
|1234 |CX |2018-03-02 00:00:00|
|5678 |NY |2019-01-01 00:00:00|
|5678 |NY |2018-01-01 00:00:00|
|7088 |SF |2018-09-01 00:00:00|
+---------+------------+-------------------+
+---------+-------------------+------------+
|member_id| activation_date|member_state|
+---------+-------------------+------------+
| 7088|2018-09-01 00:00:00| SF|
| 1234|2018-09-09 00:00:00| CX|
| 5678|2019-01-01 00:00:00| NY|
+---------+-------------------+------------+
解决方案
您可以使用很多技术,例如Ranking
或Dataset
. 我更喜欢使用reduceGroups
它,因为它是函数式的方式并且易于解释。
case class MemberDetails(member_id: String, member_state: String, activation_date: FileStreamSource.Timestamp)
val dataDS: Dataset[MemberDetails] = spark.createDataFrame(
spark.sparkContext.parallelize(mamberData),
StructType(MemberDataSchema)
).as[MemberDetails]
.groupByKey(_.member_id)
.reduceGroups((r1, r2) ⇒ if (r1.activation_date > r2.activation_date) r1 else r2)
.map { case (key, row) ⇒ row }
dataDS.show(truncate = false)
推荐阅读
- json - 从 API 错误中获取数据,JSON 输入意外结束
- html - 为什么表格需要 width:0 来尊重列宽?
- java - FAIL - 无法启动上下文路径 /sagar 处的应用程序
- google-chrome - 谷歌登录按钮在 Chrome 中不起作用
- scala - scala导入库通配符
- javascript - Parse 获取没有任何实际数据的对象
- oracle - oracle 11g文件在PL/SQL proc中读取而不创建目录
- java - 使用不同的编码器/解码器将二进制转换为 base64
- php - 如何在 RabbitMQ 中发送带有通配符队列名称的消费任务?
- php - 如何使用 PHP 简单的 HTML DOM 解析器获取标签的属性