python - 熊猫将不规则的时间序列与不同的频率对齐
问题描述
我有 3 个时间序列,我需要随着时间的推移进行比较。显然,它们需要对齐才能具有可比性。不幸的是,3 个时间序列中有 2 个是不规则的。其他 2 个范围从ID
每天 2 到 500k 次观察。
初始时间序列每 300 毫秒可用一次,并且可以与其他两个时间序列连接。
但是我有两个问题:
- 我在上面介绍的这 3 个系列的格式为
ID, time, value
,即每个组构成一个单独的时间序列 - 制定连接条件,即假设
LEFT
最细粒度在一个时间窗口内可连接,因为可能不完全匹配
编辑
一些虚拟数据
import pandas as pd
from datetime import datetime
import numpy as np
def make_df(frequency, valueName):
date_rng = pd.date_range(start='2018-01-01', end='2018-01-02', freq=frequency)
ts = pd.Series(np.random.randn(len(date_rng)), index=date_rng)
groups = ['a', 'b', 'c', 'd', 'e']
group_series = [groups[np.random.randint(len(groups))] for i in range(0, len(date_rng))]
df = pd.DataFrame(ts, columns=[valueName])
df['group'] = group_series
return df
df_1 = make_df('ms', 'value_A')
display(df_1.head())
df_2 = make_df('H', 'value_B')
display(df_2.head())
df_3 = make_df('S', 'value_C')
display(df_3.head())
代码(都不是真正的pythonic):我正在尝试一些类似于a JOIN b ON a.group = b.group AND time in window(some_seconds)
SQL的非等连接,但是如果有多个匹配的记录,即不仅第一个而且全部匹配/生成一行,这就会出现问题。
此外,我确实对类似于 (spark): 的数据进行了分组,df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
但这可能会非常有损。
然后我发现 (pandas) Pandas 将多个数据帧与 TimeStamp 索引对齐,这看起来已经很有趣了,但只会产生完全匹配。但是,当尝试使用df_2.join(df_3, how='outer', on=['group'], rsuffix='_1')
which 时,它不仅会在(确切)时间加入,而且group
会失败并出现所需的错误pd.concat
。
经过更多搜索后,我发现(pyspark)https://github.com/twosigma/flint实现了一个时间序列连接在一个区间内 - 但是,我在使用它时遇到了问题。
解决方案
我找不到在 pandas 中执行此操作的简单方法 - 所以我直接在 spark 中执行此操作。
弗林特是我选择的工具。最初,燧石在 spark 2.2 上不起作用,但我在这里修复:https ://github.com/geoHeil/flint/commit/a2827d38e155ec8ddd4252dc62d89181f14f0c47以下工作正常:
val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
val rightTS = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)
val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
即它对所有组执行某种笛卡尔连接:
mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
| time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000| 3| 0.3| 3| 13|
|2000000| 3| 0.4| 3| 14|
使用不同的删除重复项。
推荐阅读
- java - Intellij 不显示依赖源,但总是显示 .m2 中 jar 的反编译类
- java - 文本编辑器:如果按下某个键想要取消键事件并提供新功能
- numpy - 如何使用 keras to_categorical 对数组的 numpy 数组进行一次热编码?
- javascript - 使用移位反转数组而不反转,结果令人困惑
- r - 如何判断r中的字符串中是否有单引号或双引号
- python - PyAutoGUI 图像置信度
- node.js - mongoose save 和 passport-local-mongoose register() 有什么区别?
- python - 将snake_case中的大写字母保留为camel case
- python - Python islice 类未按预期工作
- python - 如何在配置文件中使用百分比存储/编码 URL?