首页 > 解决方案 > 为不同实体填补时间序列 Spark 中的空白

问题描述

我有一个数据框,其中包含及时与各种实体相关的日常事件。我想填补那些时间序列中的空白。

这是我拥有的汇总数据(左侧),右侧是我想要拥有的数据:

+---------+----------+-------+               +---------+----------+-------+
|entity_id|      date|counter|               |entity_id|      date|counter|
+---------+----------+-------+               +---------+----------+-------+
|        3|2020-01-01|      7|               |        3|2020-01-01|      7|
|        1|2020-01-01|     10|               |        1|2020-01-01|     10|
|        2|2020-01-01|      3|               |        2|2020-01-01|      3|
|        2|2020-01-02|      9|               |        2|2020-01-02|      9|
|        1|2020-01-03|     15|               |        1|2020-01-02|      0|
|        2|2020-01-04|      3|               |        3|2020-01-02|      0|
|        1|2020-01-04|     14|               |        1|2020-01-03|     15|
|        2|2020-01-05|      6|               |        2|2020-01-03|      0|
+---------+----------+-------+               |        3|2020-01-03|      0|
                                             |        3|2020-01-04|      0|
                                             |        2|2020-01-04|      3|
                                             |        1|2020-01-04|     14|
                                             |        2|2020-01-05|      6|
                                             |        1|2020-01-05|      0|
                                             |        3|2020-01-05|      0|
                                             +---------+----------+-------+

我使用了这个堆栈溢出主题,非常有用: Filling gaps in timeseries Spark

这是我的代码(仅过滤一个实体),它在 Python 中,但我认为 API 在 Scala 中是相同的:

(
    df
    .withColumn("date", sf.to_date("created_at"))
    .groupBy(
        sf.col("entity_id"),
        sf.col("date")
    )
    .agg(sf.count(sf.lit(1)).alias("counter"))
    .filter(sf.col("entity_id") == 1)
    .select(
        sf.col("date"),
        sf.col("counter")
    )
    .join(
        spark
        .range(
            df # range start
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.min("created_at")).alias("min"))
            .first().min // a * a, # a = 60 * 60 * 24 = seconds in one day
            
            (df # range end
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.max("created_at")).alias("max"))
            .first().max // a + 1) * a,
            
            a # range step, a = 60 * 60 * 24 = seconds in one day
        )
        .select(sf.to_date(sf.from_unixtime("id")).alias("date")),
        ["date"], # column which will be used for the join
        how="right" # type of join
    )
    .withColumn("counter", sf.when(sf.isnull("counter"), 0).otherwise(sf.col("counter")))
    .sort(sf.col("date"))
    .show(200)
)

这工作得很好,但现在我想避免filterand 做一个范围来填补每个实体(entity_id == 2, entity_id == 3, ...)的时间序列空白。供您参考,根据entity_id值的不同,列的最小值和最大值date可能会有所不同,但是如果您的帮助涉及整个数据框的全局最小值和最大值,我也可以。

如果您需要任何其他信息,请随时询问。

编辑:添加我想要的数据示例

标签: apache-sparkpyspark

解决方案


在创建日期范围的元素时,我宁愿使用 Pandas 函数而不是 Spark 范围,因为 Spark 范围函数在处理日期值时存在一些缺点。不同日期的数量通常很少。即使处理多年的时间跨度,不同日期的数量也非常少,可以很容易地在一个连接中广播。

#get the minimun and maximun date and collect it to the driver
min_date, max_date = df.select(F.min("date"), F.max("date")).first()

#use Pandas to create all dates and switch back to PySpark DataFrame
from pandas import pandas as pd
timerange = pd.date_range(start=min_date, end=max_date, freq='1d')
all_dates = spark.createDataFrame(timerange.to_frame(),['date'])

#get all combinations of dates and entity_ids
all_dates_and_ids = all_dates.crossJoin(df.select("entity_id").distinct())

#create the final result by doing a left join and filling null values with 0
result = all_dates_and_ids.join(df, on=['date', 'entity_id'], how="left_outer")\
    .fillna({'counter':'0'}) \
    .orderBy(['date', 'entity_id'])

这给

+-------------------+---------+-------+
|               date|entity_id|counter|
+-------------------+---------+-------+
|2020-01-01 00:00:00|        1|     10|
|2020-01-01 00:00:00|        2|      3|
|2020-01-01 00:00:00|        3|      7|
|2020-01-02 00:00:00|        1|      0|
|2020-01-02 00:00:00|        2|      9|
|2020-01-02 00:00:00|        3|      0|
|2020-01-03 00:00:00|        1|     15|
|2020-01-03 00:00:00|        2|      0|
|2020-01-03 00:00:00|        3|      0|
|2020-01-04 00:00:00|        1|     14|
|2020-01-04 00:00:00|        2|      3|
|2020-01-04 00:00:00|        3|      0|
|2020-01-05 00:00:00|        1|      0|
|2020-01-05 00:00:00|        2|      6|
|2020-01-05 00:00:00|        3|      0|
+-------------------+---------+-------+

推荐阅读