sql - 在pyspark中以一秒为粒度的时间范围内计算与一种特定类型的时间的每个时间差的类型
问题描述
DataFrame
我在pyspark中有以下时间序列数据:
(id, timestamp, type)
该
id
列可以是任何整数值,并且表中可以存在许多具有相同 id 的行该
timestamp
列是由整数表示的时间戳(为简化起见)该
type
列是一个字符串类型变量,其中列上的每个不同字符串代表一个类别。其中一个特殊类别是“A”
我的问题如下:
有什么方法可以计算(使用 SQL 或 pyspark DataFrame 操作):
一个时间范围内(如[-5,+5])内所有type='A'行对应的时间戳与时间戳的所有时间差的每种类型的计数,粒度为1秒
例如,对于以下内容DataFrame
:
ts_df = sc.parallelize([
(1,'A',100),(2,'A',1000),(3,'A',10000),
(1,'b',99),(1,'b',99),(1,'b',99),
(2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(1,'d',999),
(3,'c',9999),(3,'c',9999),(3,'d',9999),
(1,'b',98),(1,'b',98),
(2,'b',998),(2,'c',998),
(3,'c',9998)
]).toDF(["id","type","ts"])
ts_df.show()
+---+----+-----+
| id|type| ts|
+---+----+-----+
| 1| A| 100|
| 2| A| 1000|
| 3| A|10000|
| 1| b| 99|
| 1| b| 99|
| 1| b| 99|
| 2| b| 999|
| 2| b| 999|
| 2| c| 999|
| 2| c| 999|
| 1| d| 999|
| 3| c| 9999|
| 3| c| 9999|
| 3| d| 9999|
| 1| b| 98|
| 1| b| 98|
| 2| b| 998|
| 2| c| 998|
| 3| c| 9998|
+---+----+-----+
对于-1秒的时间差,结果应该是:
# result for time difference = -1 sec
# b: 5
# c: 4
# d: 2
而对于-2秒的时间差,结果应该是:
# result for time difference = -2 sec
# b: 3
# c: 2
# d: 0
以此类推,以1秒为粒度的时间范围内的任何时间差。
我尝试了许多不同的方法,主要是使用groupBy
,但似乎没有任何效果。
我在如何表达 type= 每一行的时间差方面遇到困难,A
即使我必须针对一个特定的时间差来做。
任何建议将不胜感激!
编辑:
如果我只需要针对一个特定的时差time_difference
执行此操作,那么我可以通过以下方式执行此操作:
time_difference = -1
df_type_A = ts_df.where(F.col("type")=='A').selectExpr("ts as fts")
res = df_type_A.join(ts_df, on=df_type_A.fts+time_difference==ts_df.ts)\
.drop("ts","fts").groupBy(F.col("type")).count()
返回的res
DataFrame 将为我提供一个特定时差的确切信息。我创建了一个循环并通过一遍又一遍地重复相同的查询来解决问题。
但是,还有比这更有效的方法吗?
EDIT2(解决方案) 所以我最后就是这样做的:
df1 = sc.parallelize([
(1,'b',99),(1,'b',99),(1,'b',99),
(2,'b',999),(2,'b',999),(2,'c',999),(2,'c',999),(2,'d',999),
(3,'c',9999),(3,'c',9999),(3,'d',9999),
(1,'b',98),(1,'b',98),
(2,'b',998),(2,'c',998),
(3,'c',9998)
]).toDF(["id","type","ts"])
df1.show()
df2 = sc.parallelize([
(1,'A',100),(2,'A',1000),(3,'A',10000),
]).toDF(["id","type","ts"]).selectExpr("id as fid","ts as fts","type as ftype")
df2.show()
df3 = df2.join(df1, on=df1.id==df2.fid).withColumn("td", F.col("ts")-F.col("fts"))
df3.show()
df4 = df3.groupBy([F.col("type"),F.col("td")]).count()
df4.show()
我会尽快更新性能细节。
谢谢!
解决方案
解决此问题的另一种方法是:
- 将现有数据帧分成两个数据帧 - 有 A 和没有 A
- 在没有 A df 的情况下添加一个新列,它是“ts”和 time_difference 的总和
- 加入数据框、分组依据和计数。
这是一个代码:
from pyspark.sql.functions import lit
time_difference = 1
ts_df_A = (
ts_df
.filter(ts_df["type"] == "A")
.drop("id")
.drop("type")
)
ts_df_td = (
ts_df
.withColumn("ts_plus_td", lit(ts_df['ts'] + time_difference))
.filter(ts_df["type"] != "A")
.drop("ts")
)
joined_df = ts_df_A.join(ts_df_td, ts_df_A["ts"] == ts_df_td["ts_plus_td"])
agg_df = joined_df.groupBy("type").count()
>>> agg_df.show()
+----+-----+
|type|count|
+----+-----+
| d| 2|
| c| 4|
| b| 5|
+----+-----+
>>>
让我知道这是否是您要找的?
谢谢,侯赛因·博拉
推荐阅读
- python-3.x - 如何将 nltk.parse.corenlp.CoreNLPTokenizer 用于斯坦福中文分词器
- java - 带 redisson 的 Tomcat 会话管理器
- mysql - 更改列mysql php 4.4.9中的所有值
- c++ - std::vector<> 使用基类的构造函数并重载派生类后不存在默认构造函数
- sql - SQL Server:函数调用案例
- c# - 有起点的乒乓运动
- javascript - 使用 on change 调用原型函数
- git - git 是否有快捷命令将 HEAD 修复为 HEAD~1
- c# - 如何在处理程序中的事件处理程序中传递自定义参数
- javascript - 尝试在可视代码中导入模块时,它显示 Unexpected token {