首页 > 解决方案 > 在pyspark中以一秒为粒度的时间范围内计算与一种特定类型的时间的每个时间差的类型

问题描述

DataFrame我在pyspark中有以下时间序列数据:

(id, timestamp, type)

我的问题如下:

有什么方法可以计算(使用 SQL 或 pyspark DataFrame 操作):

一个时间范围内(如[-5,+5])内所有type='A'行对应的时间戳与时间戳的所有时间差的每种类型的计数,粒度为1秒

例如,对于以下内容DataFrame

ts_df = sc.parallelize([
    (1,'A',100),(2,'A',1000),(3,'A',10000),

    (1,'b',99),(1,'b',99),(1,'b',99),
    (2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(1,'d',999),
    (3,'c',9999),(3,'c',9999),(3,'d',9999),

    (1,'b',98),(1,'b',98),
    (2,'b',998),(2,'c',998),
    (3,'c',9998)
]).toDF(["id","type","ts"])
ts_df.show()
+---+----+-----+
| id|type|   ts|
+---+----+-----+
|  1|   A|  100|
|  2|   A| 1000|
|  3|   A|10000|
|  1|   b|   99|
|  1|   b|   99|
|  1|   b|   99|
|  2|   b|  999|
|  2|   b|  999|
|  2|   c|  999|
|  2|   c|  999|
|  1|   d|  999|
|  3|   c| 9999|
|  3|   c| 9999|
|  3|   d| 9999|
|  1|   b|   98|
|  1|   b|   98|
|  2|   b|  998|
|  2|   c|  998|
|  3|   c| 9998|
+---+----+-----+

对于-1秒的时间差,结果应该是:

# result for time difference = -1 sec
# b: 5
# c: 4
# d: 2

而对于-2秒的时间差,结果应该是:

# result for time difference = -2 sec
# b: 3
# c: 2
# d: 0

以此类推,以1秒为粒度的时间范围内的任何时间差。

我尝试了许多不同的方法,主要是使用groupBy,但似乎没有任何效果。

我在如何表达 type= 每一行的时间差方面遇到困难,A即使我必须针对一个特定的时间差来做。

任何建议将不胜感激!

编辑:

如果我只需要针对一个特定的时差time_difference执行此操作,那么我可以通过以下方式执行此操作:

time_difference = -1
df_type_A = ts_df.where(F.col("type")=='A').selectExpr("ts as fts")
res = df_type_A.join(ts_df, on=df_type_A.fts+time_difference==ts_df.ts)\
.drop("ts","fts").groupBy(F.col("type")).count()

返回的resDataFrame 将为我提供一个特定时差的确切信息。我创建了一个循环并通过一遍又一遍地重复相同的查询来解决问题。

但是,还有比这更有效的方法吗?

EDIT2(解决方案) 所以我最后就是这样做的:

df1 = sc.parallelize([
    (1,'b',99),(1,'b',99),(1,'b',99),
    (2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(2,'d',999),
    (3,'c',9999),(3,'c',9999),(3,'d',9999),

    (1,'b',98),(1,'b',98),
    (2,'b',998),(2,'c',998),
    (3,'c',9998)
]).toDF(["id","type","ts"])
df1.show()

df2 = sc.parallelize([
    (1,'A',100),(2,'A',1000),(3,'A',10000),
]).toDF(["id","type","ts"]).selectExpr("id as fid","ts as fts","type as ftype")
df2.show()

df3 = df2.join(df1, on=df1.id==df2.fid).withColumn("td", F.col("ts")-F.col("fts"))
df3.show()

df4 = df3.groupBy([F.col("type"),F.col("td")]).count()
df4.show()

我会尽快更新性能细节。

谢谢!

标签: sqlpysparktime-seriesdata-analysisolap

解决方案


解决此问题的另一种方法是:

  • 将现有数据帧分成两个数据帧 - 有 A 和没有 A
  • 在没有 A df 的情况下添加一个新列,它是“ts”和 time_difference 的总和
  • 加入数据框、分组依据和计数。

这是一个代码:

from pyspark.sql.functions import lit
time_difference = 1
ts_df_A = (
    ts_df
    .filter(ts_df["type"] == "A")
    .drop("id")
    .drop("type")
)

ts_df_td = (
    ts_df
    .withColumn("ts_plus_td", lit(ts_df['ts'] + time_difference))
    .filter(ts_df["type"] != "A")
    .drop("ts")
)

joined_df = ts_df_A.join(ts_df_td, ts_df_A["ts"] == ts_df_td["ts_plus_td"])
agg_df = joined_df.groupBy("type").count()

>>> agg_df.show()
+----+-----+
|type|count|
+----+-----+
|   d|    2|
|   c|    4|
|   b|    5|
+----+-----+

>>>

让我知道这是否是您要找的?

谢谢,侯赛因·博拉


推荐阅读