首页 > 解决方案 > PySpark:如何分组、重新采样和前向填充空值?

问题描述

考虑到Spark中的以下数据集,我想以特定频率(例如 5 分钟)重新采样日期。

START_DATE = dt.datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
    'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
    'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
    'user_id': [15,15,16,16,15,17,17,17,16,17],
    'status': [0,1,1,1,0,1,0,1,1,0],
    'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})

test_df.groupby(['school_id', 'class_id', 'user_id', 'start']).min()

但是,我也希望重新采样发生在两个特定日期范围之间:2019-08-15 20:30:002019-08-15 21:00:00. 因此,每组school_id,class_iduser_id将有 6 个条目,在两个日期范围之间每 5 分钟存储一个。重采样生成的null条目应由前向填充填充。

我已将 Pandas 用于示例数据集,但实际数据帧将在 Spark 中提取,因此我正在寻找的方法也应在 Spark 中完成。

我猜这种方法可能类似于这个PySpark:如何重新采样频率,但我没有让它在这种情况下工作。

谢谢你的帮助

标签: pythonpyspark

解决方案


这可能不是获得最终结果的最佳方式,而只是想在这里展示一下这个想法。

  1. 首先,创建 DataFrame 并将时间戳转换为整数
from datetime import datetime
import pytz
from pytz import timezone

# Create DataFrame
START_DATE = datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
    'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
    'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
    'user_id': [15,15,16,16,15,17,17,17,16,17],
    'status': [0,1,1,1,0,1,0,1,1,0],
    'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})

# Convert TimeStamp to Integers
df = spark.createDataFrame(test_df)
print(df.dtypes)
df = df.withColumn('start', F.col('start').cast("bigint"))
df.show()

这输出:

+---------+--------+-------+------+----------+
|school_id|class_id|user_id|status|     start|
+---------+--------+-------+------+----------+
|   remote|   green|     15|     0|1565915580|
|   remote|   green|     15|     1|1565915700|
|   remote|     red|     16|     1|1565915820|
|   remote|     red|     16|     1|1565915940|
|   onsite|   green|     15|     0|1565916060|
|   onsite|   green|     17|     1|1565916180|
|   onsite|   green|     17|     0|1565916300|
|   onsite|   green|     17|     1|1565916420|
|   remote|     red|     16|     1|1565916540|
|   remote|   green|     17|     0|1565916660|
+---------+--------+-------+------+----------+
  1. 创建您想要的时间序列
# Create time sequece needed
start = datetime.strptime('2019-08-15 20:30:00', '%Y-%m-%d %H:%M:%S')
eastern = timezone('US/Eastern')
start = eastern.localize(start)
times = pd.date_range(start = start, periods = 6, freq='5min')
times = [s.timestamp() for s in times]
print(times)
[1565915400.0, 1565915700.0, 1565916000.0, 1565916300.0, 1565916600.0, 1565916900.0]
  1. 最后,为每个组创建数据框
# Use pandas_udf to create final DataFrame
schm = StructType(df.schema.fields + [StructField('epoch', IntegerType(), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def resample(pdf):
    pddf = pd.DataFrame({'epoch':times})
    pddf['school_id'] = pdf['school_id'][0]
    pddf['class_id'] = pdf['class_id'][0]
    pddf['user_id'] = pdf['user_id'][0]


    res = np.searchsorted(times, pdf['start'])
    arr = np.zeros(len(times))
    arr[:] = np.nan
    arr[res] = pdf['start']
    pddf['status'] = arr

    arr[:] = np.nan
    arr[res] = pdf['status']
    pddf['start'] = arr
    return pddf

df = df.groupBy('school_id', 'class_id', 'user_id').apply(resample)
df = df.withColumn('timestamp', F.to_timestamp(df['epoch']))
df.show(60)

最终结果:

+---------+--------+-------+----------+-----+----------+-------------------+
|school_id|class_id|user_id|    status|start|     epoch|          timestamp|
+---------+--------+-------+----------+-----+----------+-------------------+
|   remote|     red|     16|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|     red|     16|      null| null|1565915700|2019-08-15 20:35:00|
|   remote|     red|     16|1565915940|    1|1565916000|2019-08-15 20:40:00|
|   remote|     red|     16|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|     red|     16|1565916540|    1|1565916600|2019-08-15 20:50:00|
|   remote|     red|     16|      null| null|1565916900|2019-08-15 20:55:00|
|   onsite|   green|     15|      null| null|1565915400|2019-08-15 20:30:00|
|   onsite|   green|     15|      null| null|1565915700|2019-08-15 20:35:00|
|   onsite|   green|     15|      null| null|1565916000|2019-08-15 20:40:00|
|   onsite|   green|     15|1565916060|    0|1565916300|2019-08-15 20:45:00|
|   onsite|   green|     15|      null| null|1565916600|2019-08-15 20:50:00|
|   onsite|   green|     15|      null| null|1565916900|2019-08-15 20:55:00|
|   remote|   green|     17|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|   green|     17|      null| null|1565915700|2019-08-15 20:35:00|
|   remote|   green|     17|      null| null|1565916000|2019-08-15 20:40:00|
|   remote|   green|     17|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|   green|     17|      null| null|1565916600|2019-08-15 20:50:00|
|   remote|   green|     17|1565916660|    0|1565916900|2019-08-15 20:55:00|
|   onsite|   green|     17|      null| null|1565915400|2019-08-15 20:30:00|
|   onsite|   green|     17|      null| null|1565915700|2019-08-15 20:35:00|
|   onsite|   green|     17|      null| null|1565916000|2019-08-15 20:40:00|
|   onsite|   green|     17|1565916180|    1|1565916300|2019-08-15 20:45:00|
|   onsite|   green|     17|1565916420|    1|1565916600|2019-08-15 20:50:00|
|   onsite|   green|     17|      null| null|1565916900|2019-08-15 20:55:00|
|   remote|   green|     15|      null| null|1565915400|2019-08-15 20:30:00|
|   remote|   green|     15|1565915580|    0|1565915700|2019-08-15 20:35:00|
|   remote|   green|     15|      null| null|1565916000|2019-08-15 20:40:00|
|   remote|   green|     15|      null| null|1565916300|2019-08-15 20:45:00|
|   remote|   green|     15|      null| null|1565916600|2019-08-15 20:50:00|
|   remote|   green|     15|      null| null|1565916900|2019-08-15 20:55:00|
+---------+--------+-------+----------+-----+----------+-------------------+

现在每个组都有 6 个时间戳。注意,并不是所有的原始“状态”和“开始”都映射到最终的DataFrame,这是因为在udf中resample,它发生在5minute间隔上,两个“开始”时间可以映射到同一个时间网格点,你在这里失去了一个。这可以udf根据您的频率以及您希望如何保留数据进行调整。


推荐阅读