apache-spark - spark基于其他数据集的数据集值计算
问题描述
我有一个包含三个值的数据集,“日期”、“帐户 ID”和“项目”,当在 10 天范围内的给定日期缺少帐户 ID 时,我需要进行一些计算,我采用的方法是创建过去 10 天的日期为空的数据框,并将其与原始数据集进行比较,以查看帐户 ID 是否丢失。在我的最终输出中,我需要为每个帐户 id 进行计算,它们不会在当天出现在原始数据集上。这是我尝试过的。
val df = sc.parallelize(Seq(("2018-06-01","id1","small"),
("2018-06-01","id1","small"),
("2018-06-01","id1","small"),
("2018-06-02","id1","small"),
("2018-06-03", "id2","medium"),
("2018-06-03", "id2","medium"),
("2018-06-08", null,null),
("2018-06-04", "id1","small"),
("2018-06-06", "id3","large"))).toDF("date","accountid","item")
df.show
+----------+---------+------+
| date|accountid| item|
+----------+---------+------+
|2018-06-01| id1| small|
|2018-06-01| id1| small|
|2018-06-01| id1| small|
|2018-06-02| id1| small|
|2018-06-03| id2|medium|
|2018-06-03| id2|medium|
|2018-06-08| null| null|
|2018-06-04| id1| small|
|2018-06-06| id3| large|
+----------+---------+------+
我正在考虑广播独特的 accounid 和项目值
import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val itemList=df.select(collect_set("item")).first().getAs[Seq[String]](0)
并将原始数据集与日期数据集进行比较。
import java.time.LocalDate
import scala.collection.mutable.ListBuffer
val dateTime = LocalDate.now()
var startDate=dateTime.minusDays(10)
val stop = dateTime
var dateArray = new ListBuffer[LocalDate]()
while (startDate.isBefore(stop)) {
dateArray+=startDate
startDate = startDate.plusDays(1);
}
val dateList=dateArray.toList.map(_.toString)
dateList.toDF.show
+----------+
| value|
+----------+
|2018-07-04|
|2018-07-05|
|2018-07-06|
|2018-07-07|
|2018-07-08|
|2018-07-09|
|2018-07-10|
|2018-07-11|
|2018-07-12|
|2018-07-13|
+----------+
最后的输出我期待这样的东西。
+----------+---------
| value|dayaggregate
+----------+---------
|2018-07-04|[[id1-> [small=0,medium=1..],[id2->[small=1,medium=0,..]]]
|2018-07-05|[[id1-> [small=0,medium=1..],[id2->[small=1,medium=0,..]]]
解决这个问题的最佳方法是什么?
解决方案
推荐阅读
- php - 如何使用整数数组重新映射我的数组键?
- python - 求函数 Python
- javascript - 我应该有一个更新所有精灵的事件侦听器,还是每个精灵都有自己的事件侦听器?
- java - 更换纺织品中的一条线,留下 oldLine 的残留物
- list - Haskell 旋转列表列表
- ios - 如何同时检测两组不同的 ibeacon UUID?
- firebase - Firebase Firstore 子集合
- javascript - 如何遍历 array.map 函数中的数组?
- python-3.x - 特定 Outlook 电子邮件的 Python 问题
- javascript - 有没有办法检查字符串中一定数量的大写字符?