首页 > 解决方案 > 基于最大日期记录的 Spark 过滤器

问题描述

我正在使用 Spark/Scala 处理Hive包含每个成员的事务数据的表。我需要获取每个成员的最大记录。我使用下面的代码完成了这项任务,它成功运行,但没有得到性能。

我需要问是否有任何其他方法可以增强此代码的性能?我找到了一些使用 spark-sql 的方法,但我更喜欢SparkDataframe 或 Dataset。

下面的示例将重现我的代码和我的数据。

  val mamberData = Seq(
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-09-09 00:00:00")),
    Row("1234", "CX", java.sql.Timestamp.valueOf("2018-03-02 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2019-01-01 00:00:00")),
    Row("5678", "NY", java.sql.Timestamp.valueOf("2018-01-01 00:00:00")),
    Row("7088", "SF", java.sql.Timestamp.valueOf("2018-09-01 00:00:00"))
  )



  val MemberDataSchema = List(
    StructField("member_id", StringType, nullable = true),
    StructField("member_state", StringType, nullable = true),
    StructField("activation_date", TimestampType, nullable = true)
  )

  import spark.implicits._

  val memberDF =spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  )

  val memberDfMaxDate = memberDF.groupBy('member_id).agg(max('activation_date).as("activation_date"))

  val memberDFMaxOnly = memberDF.join(memberDfMaxDate,Seq("member_id","activation_date"))

输出如下

+---------+------------+-------------------+
|member_id|member_state|activation_date    |
+---------+------------+-------------------+
|1234     |CX          |2018-09-09 00:00:00|
|1234     |CX          |2018-03-02 00:00:00|
|5678     |NY          |2019-01-01 00:00:00|
|5678     |NY          |2018-01-01 00:00:00|
|7088     |SF          |2018-09-01 00:00:00|
+---------+------------+-------------------+

+---------+-------------------+------------+
|member_id|    activation_date|member_state|
+---------+-------------------+------------+
|     7088|2018-09-01 00:00:00|          SF|
|     1234|2018-09-09 00:00:00|          CX|
|     5678|2019-01-01 00:00:00|          NY|
+---------+-------------------+------------+

标签: scalaapache-spark

解决方案


您可以使用很多技术,例如RankingDataset. 我更喜欢使用reduceGroups它,因为它是函数式的方式并且易于解释。

  case class MemberDetails(member_id: String, member_state: String, activation_date: FileStreamSource.Timestamp)

  val dataDS: Dataset[MemberDetails] = spark.createDataFrame(
    spark.sparkContext.parallelize(mamberData),
    StructType(MemberDataSchema)
  ).as[MemberDetails]
    .groupByKey(_.member_id)
    .reduceGroups((r1, r2) ⇒ if (r1.activation_date > r2.activation_date) r1 else r2)
    .map { case (key, row) ⇒ row }


  dataDS.show(truncate = false)

推荐阅读