python - PySpark:如何分组、重新采样和前向填充空值?
问题描述
考虑到Spark中的以下数据集,我想以特定频率(例如 5 分钟)重新采样日期。
START_DATE = dt.datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
'user_id': [15,15,16,16,15,17,17,17,16,17],
'status': [0,1,1,1,0,1,0,1,1,0],
'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})
test_df.groupby(['school_id', 'class_id', 'user_id', 'start']).min()
但是,我也希望重新采样发生在两个特定日期范围之间:2019-08-15 20:30:00
和2019-08-15 21:00:00
. 因此,每组school_id
,class_id
和user_id
将有 6 个条目,在两个日期范围之间每 5 分钟存储一个。重采样生成的null
条目应由前向填充填充。
我已将 Pandas 用于示例数据集,但实际数据帧将在 Spark 中提取,因此我正在寻找的方法也应在 Spark 中完成。
我猜这种方法可能类似于这个PySpark:如何重新采样频率,但我没有让它在这种情况下工作。
谢谢你的帮助
解决方案
这可能不是获得最终结果的最佳方式,而只是想在这里展示一下这个想法。
- 首先,创建 DataFrame 并将时间戳转换为整数
from datetime import datetime
import pytz
from pytz import timezone
# Create DataFrame
START_DATE = datetime(2019,8,15,20,33,0)
test_df = pd.DataFrame({
'school_id': ['remote','remote','remote','remote','onsite','onsite','onsite','onsite','remote','remote'],
'class_id': ['green', 'green', 'red', 'red', 'green', 'green', 'green', 'green', 'red', 'green'],
'user_id': [15,15,16,16,15,17,17,17,16,17],
'status': [0,1,1,1,0,1,0,1,1,0],
'start': pd.date_range(start=START_DATE, periods=10, freq='2min')
})
# Convert TimeStamp to Integers
df = spark.createDataFrame(test_df)
print(df.dtypes)
df = df.withColumn('start', F.col('start').cast("bigint"))
df.show()
这输出:
+---------+--------+-------+------+----------+
|school_id|class_id|user_id|status| start|
+---------+--------+-------+------+----------+
| remote| green| 15| 0|1565915580|
| remote| green| 15| 1|1565915700|
| remote| red| 16| 1|1565915820|
| remote| red| 16| 1|1565915940|
| onsite| green| 15| 0|1565916060|
| onsite| green| 17| 1|1565916180|
| onsite| green| 17| 0|1565916300|
| onsite| green| 17| 1|1565916420|
| remote| red| 16| 1|1565916540|
| remote| green| 17| 0|1565916660|
+---------+--------+-------+------+----------+
- 创建您想要的时间序列
# Create time sequece needed
start = datetime.strptime('2019-08-15 20:30:00', '%Y-%m-%d %H:%M:%S')
eastern = timezone('US/Eastern')
start = eastern.localize(start)
times = pd.date_range(start = start, periods = 6, freq='5min')
times = [s.timestamp() for s in times]
print(times)
[1565915400.0, 1565915700.0, 1565916000.0, 1565916300.0, 1565916600.0, 1565916900.0]
- 最后,为每个组创建数据框
# Use pandas_udf to create final DataFrame
schm = StructType(df.schema.fields + [StructField('epoch', IntegerType(), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def resample(pdf):
pddf = pd.DataFrame({'epoch':times})
pddf['school_id'] = pdf['school_id'][0]
pddf['class_id'] = pdf['class_id'][0]
pddf['user_id'] = pdf['user_id'][0]
res = np.searchsorted(times, pdf['start'])
arr = np.zeros(len(times))
arr[:] = np.nan
arr[res] = pdf['start']
pddf['status'] = arr
arr[:] = np.nan
arr[res] = pdf['status']
pddf['start'] = arr
return pddf
df = df.groupBy('school_id', 'class_id', 'user_id').apply(resample)
df = df.withColumn('timestamp', F.to_timestamp(df['epoch']))
df.show(60)
最终结果:
+---------+--------+-------+----------+-----+----------+-------------------+
|school_id|class_id|user_id| status|start| epoch| timestamp|
+---------+--------+-------+----------+-----+----------+-------------------+
| remote| red| 16| null| null|1565915400|2019-08-15 20:30:00|
| remote| red| 16| null| null|1565915700|2019-08-15 20:35:00|
| remote| red| 16|1565915940| 1|1565916000|2019-08-15 20:40:00|
| remote| red| 16| null| null|1565916300|2019-08-15 20:45:00|
| remote| red| 16|1565916540| 1|1565916600|2019-08-15 20:50:00|
| remote| red| 16| null| null|1565916900|2019-08-15 20:55:00|
| onsite| green| 15| null| null|1565915400|2019-08-15 20:30:00|
| onsite| green| 15| null| null|1565915700|2019-08-15 20:35:00|
| onsite| green| 15| null| null|1565916000|2019-08-15 20:40:00|
| onsite| green| 15|1565916060| 0|1565916300|2019-08-15 20:45:00|
| onsite| green| 15| null| null|1565916600|2019-08-15 20:50:00|
| onsite| green| 15| null| null|1565916900|2019-08-15 20:55:00|
| remote| green| 17| null| null|1565915400|2019-08-15 20:30:00|
| remote| green| 17| null| null|1565915700|2019-08-15 20:35:00|
| remote| green| 17| null| null|1565916000|2019-08-15 20:40:00|
| remote| green| 17| null| null|1565916300|2019-08-15 20:45:00|
| remote| green| 17| null| null|1565916600|2019-08-15 20:50:00|
| remote| green| 17|1565916660| 0|1565916900|2019-08-15 20:55:00|
| onsite| green| 17| null| null|1565915400|2019-08-15 20:30:00|
| onsite| green| 17| null| null|1565915700|2019-08-15 20:35:00|
| onsite| green| 17| null| null|1565916000|2019-08-15 20:40:00|
| onsite| green| 17|1565916180| 1|1565916300|2019-08-15 20:45:00|
| onsite| green| 17|1565916420| 1|1565916600|2019-08-15 20:50:00|
| onsite| green| 17| null| null|1565916900|2019-08-15 20:55:00|
| remote| green| 15| null| null|1565915400|2019-08-15 20:30:00|
| remote| green| 15|1565915580| 0|1565915700|2019-08-15 20:35:00|
| remote| green| 15| null| null|1565916000|2019-08-15 20:40:00|
| remote| green| 15| null| null|1565916300|2019-08-15 20:45:00|
| remote| green| 15| null| null|1565916600|2019-08-15 20:50:00|
| remote| green| 15| null| null|1565916900|2019-08-15 20:55:00|
+---------+--------+-------+----------+-----+----------+-------------------+
现在每个组都有 6 个时间戳。注意,并不是所有的原始“状态”和“开始”都映射到最终的DataFrame,这是因为在udf中resample
,它发生在5minute
间隔上,两个“开始”时间可以映射到同一个时间网格点,你在这里失去了一个。这可以udf
根据您的频率以及您希望如何保留数据进行调整。
推荐阅读
- python - 如何在字典中保存空间渲染标签
- python - 在数据框中相互减去每一列
- pandas - 对 pandas.Datetime 类型的 x 值进行积分
- python - Python:最后一个索引没有被迭代
- inno-setup - 为 Inno 下载插件构建备忘录文本
- cmake - Windows 上 CMake 项目中与 SFML 库的链接错误
- reactjs - Material-UI 网站如何在其 AppBar 中创建下拉选择?
- javascript - ChartJS Linechart xAxes 处的奇怪线
- kotlin - kotlin,在基类的init块中调用哪个函数
- c# - JSON (JObject) 到 C# 对象:无法打印 Pusher 接收到的数据