首页 > 解决方案 > spark基于其他数据集的数据集值计算

问题描述

我有一个包含三个值的数据集,“日期”、“帐户 ID”和“项目”,当在 10 天范围内的给定日期缺少帐户 ID 时,我需要进行一些计算,我采用的方法是创建过去 10 天的日期为空的数据框,并将其与原始数据集进行比较,以查看帐户 ID 是否丢失。在我的最终输出中,我需要为每个帐户 id 进行计算,它们不会在当天出现在原始数据集上。这是我尝试过的。

val df = sc.parallelize(Seq(("2018-06-01","id1","small"),
  ("2018-06-01","id1","small"),  
    ("2018-06-01","id1","small"),                                                        
  ("2018-06-02","id1","small"),
  ("2018-06-03", "id2","medium"),
    ("2018-06-03", "id2","medium"),
    ("2018-06-08", null,null),         
  ("2018-06-04", "id1","small"),
  ("2018-06-06", "id3","large"))).toDF("date","accountid","item")
df.show

+----------+---------+------+
|      date|accountid|  item|
+----------+---------+------+
|2018-06-01|      id1| small|
|2018-06-01|      id1| small|
|2018-06-01|      id1| small|
|2018-06-02|      id1| small|
|2018-06-03|      id2|medium|
|2018-06-03|      id2|medium|
|2018-06-08|     null|  null|
|2018-06-04|      id1| small|
|2018-06-06|      id3| large|
+----------+---------+------+

我正在考虑广播独特的 accounid 和项目值

import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val itemList=df.select(collect_set("item")).first().getAs[Seq[String]](0)

并将原始数据集与日期数据集进行比较。

import java.time.LocalDate
import scala.collection.mutable.ListBuffer
val dateTime = LocalDate.now()
var startDate=dateTime.minusDays(10)
val stop = dateTime
var dateArray = new ListBuffer[LocalDate]()
while (startDate.isBefore(stop)) {
            dateArray+=startDate
            startDate = startDate.plusDays(1);
}
val dateList=dateArray.toList.map(_.toString)
dateList.toDF.show

+----------+
|     value|
+----------+
|2018-07-04|
|2018-07-05|
|2018-07-06|
|2018-07-07|
|2018-07-08|
|2018-07-09|
|2018-07-10|
|2018-07-11|
|2018-07-12|
|2018-07-13|
+----------+

最后的输出我期待这样的东西。

+----------+---------
|     value|dayaggregate
+----------+---------
|2018-07-04|[[id1-> [small=0,medium=1..],[id2->[small=1,medium=0,..]]]
|2018-07-05|[[id1-> [small=0,medium=1..],[id2->[small=1,medium=0,..]]]

解决这个问题的最佳方法是什么?

标签: apache-sparkapache-spark-sql

解决方案


推荐阅读