apache-spark - 为不同实体填补时间序列 Spark 中的空白
问题描述
我有一个数据框,其中包含及时与各种实体相关的日常事件。我想填补那些时间序列中的空白。
这是我拥有的汇总数据(左侧),右侧是我想要拥有的数据:
+---------+----------+-------+ +---------+----------+-------+
|entity_id| date|counter| |entity_id| date|counter|
+---------+----------+-------+ +---------+----------+-------+
| 3|2020-01-01| 7| | 3|2020-01-01| 7|
| 1|2020-01-01| 10| | 1|2020-01-01| 10|
| 2|2020-01-01| 3| | 2|2020-01-01| 3|
| 2|2020-01-02| 9| | 2|2020-01-02| 9|
| 1|2020-01-03| 15| | 1|2020-01-02| 0|
| 2|2020-01-04| 3| | 3|2020-01-02| 0|
| 1|2020-01-04| 14| | 1|2020-01-03| 15|
| 2|2020-01-05| 6| | 2|2020-01-03| 0|
+---------+----------+-------+ | 3|2020-01-03| 0|
| 3|2020-01-04| 0|
| 2|2020-01-04| 3|
| 1|2020-01-04| 14|
| 2|2020-01-05| 6|
| 1|2020-01-05| 0|
| 3|2020-01-05| 0|
+---------+----------+-------+
我使用了这个堆栈溢出主题,非常有用: Filling gaps in timeseries Spark
这是我的代码(仅过滤一个实体),它在 Python 中,但我认为 API 在 Scala 中是相同的:
(
df
.withColumn("date", sf.to_date("created_at"))
.groupBy(
sf.col("entity_id"),
sf.col("date")
)
.agg(sf.count(sf.lit(1)).alias("counter"))
.filter(sf.col("entity_id") == 1)
.select(
sf.col("date"),
sf.col("counter")
)
.join(
spark
.range(
df # range start
.filter(sf.col("entity_id") == 1)
.select(sf.unix_timestamp(sf.min("created_at")).alias("min"))
.first().min // a * a, # a = 60 * 60 * 24 = seconds in one day
(df # range end
.filter(sf.col("entity_id") == 1)
.select(sf.unix_timestamp(sf.max("created_at")).alias("max"))
.first().max // a + 1) * a,
a # range step, a = 60 * 60 * 24 = seconds in one day
)
.select(sf.to_date(sf.from_unixtime("id")).alias("date")),
["date"], # column which will be used for the join
how="right" # type of join
)
.withColumn("counter", sf.when(sf.isnull("counter"), 0).otherwise(sf.col("counter")))
.sort(sf.col("date"))
.show(200)
)
这工作得很好,但现在我想避免filter
and 做一个范围来填补每个实体(entity_id == 2
, entity_id == 3
, ...)的时间序列空白。供您参考,根据entity_id
值的不同,列的最小值和最大值date
可能会有所不同,但是如果您的帮助涉及整个数据框的全局最小值和最大值,我也可以。
如果您需要任何其他信息,请随时询问。
编辑:添加我想要的数据示例
解决方案
在创建日期范围的元素时,我宁愿使用 Pandas 函数而不是 Spark 范围,因为 Spark 范围函数在处理日期值时存在一些缺点。不同日期的数量通常很少。即使处理多年的时间跨度,不同日期的数量也非常少,可以很容易地在一个连接中广播。
#get the minimun and maximun date and collect it to the driver
min_date, max_date = df.select(F.min("date"), F.max("date")).first()
#use Pandas to create all dates and switch back to PySpark DataFrame
from pandas import pandas as pd
timerange = pd.date_range(start=min_date, end=max_date, freq='1d')
all_dates = spark.createDataFrame(timerange.to_frame(),['date'])
#get all combinations of dates and entity_ids
all_dates_and_ids = all_dates.crossJoin(df.select("entity_id").distinct())
#create the final result by doing a left join and filling null values with 0
result = all_dates_and_ids.join(df, on=['date', 'entity_id'], how="left_outer")\
.fillna({'counter':'0'}) \
.orderBy(['date', 'entity_id'])
这给
+-------------------+---------+-------+
| date|entity_id|counter|
+-------------------+---------+-------+
|2020-01-01 00:00:00| 1| 10|
|2020-01-01 00:00:00| 2| 3|
|2020-01-01 00:00:00| 3| 7|
|2020-01-02 00:00:00| 1| 0|
|2020-01-02 00:00:00| 2| 9|
|2020-01-02 00:00:00| 3| 0|
|2020-01-03 00:00:00| 1| 15|
|2020-01-03 00:00:00| 2| 0|
|2020-01-03 00:00:00| 3| 0|
|2020-01-04 00:00:00| 1| 14|
|2020-01-04 00:00:00| 2| 3|
|2020-01-04 00:00:00| 3| 0|
|2020-01-05 00:00:00| 1| 0|
|2020-01-05 00:00:00| 2| 6|
|2020-01-05 00:00:00| 3| 0|
+-------------------+---------+-------+
推荐阅读
- spring - Spring 集成网关 VS 适配器
- sql - 大查询查找可能位于多列中的数据
- salesforce - 发送电子邮件警报的工作流程规则
- jenkins - 在成功的 Github Actions 工作流上触发 Jenkins 作业
- permissions - 没有用户或角色对 SQL Server 表具有 SELECT 权限,但用户可以从中选择行
- c# - Java 的 C# 等效项 @DynamoDBTyped(DynamoDBAttributeType.BOOL)
- c - 将非数组变量的地址传递给声明为“Type ptr[static 1]”的函数参数是否有效?
- python - 虽然声明是 heroku discord bot 在一段时间后停止工作
- java - 在多级继承的情况下,如何编写一个类以将对象创建限制为一次?
- python - Python 3“getitem__(self, key)”简单错误/问题