apache-spark - 如何计算 PySpark 中数据框中的重叠天数?
问题描述
我需要在数据框中逐行计算重叠天数。数据如下所示:
+-------+-------------------+-------------------+------------------+
|id| begin| end| days|
+-------+-------------------+-------------------+------------------+
|1|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778|
|1|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|
|1|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|
|1|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889|
|1|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|
|1|2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375|
|1|2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0|
|1|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|
|1|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444|
+-------+-------------------+-------------------+------------------+
在这里,第一个条目跨越 2019 年(365 天)。所有其他条目都与第一个条目重叠。我想要一个函数来计算总天数,即删除重叠天数后数据集中的 365 天。
我实际上在 R 中解决了这个问题,但我无法在 PySpark 中运行 for 循环。
我正在寻找这样的输出。
+-------+-------------------+-------------------+------------------+------------------+
| id| begin| end| days| overlap|
+-------+-------------------+-------------------+------------------+------------------+
|1 |2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0| 0|
|1 |2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 7.090277777777778|
|1 |2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375| 27.1375|
|1 |2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|29.584027777777777|
|1 |2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 47.96944444444444|
|1 |2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|10.430555555555555|
|1 |2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|14.472222222222221|
|1 |2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.24513888888889|
|1 |2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|209.07083333333333|
+-------+-------------------+-------------------+------------------+------------------+
日期从不按顺序排列,并且在某些情况下没有重叠。
场景 2:没有重叠
+-------+-------------------+-------------------+-----+-----+
| id | begin| end| days| over|
+-------+-------------------+-------------------+-----+-----+
|2 |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
|2 |2019-12-25 00:00:00|2020-01-01 00:00:00| 7.0| 0|
+-------+-------------------+-------------------+-----+-----+
场景 3:部分重叠
+-------+-------------------+-------------------+-----+-----+
| id| begin| end| days| over|
+-------+-------------------+-------------------+-----+-----+
|3 |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
|3 |2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0| 5|
+-------+-------------------+-------------------+-----+-----+
场景 4:更复杂 这里第一个条目跨越 2019 年的前 358 天。第二个条目与第一个完全重叠,因此所有的日子都结束了。第三个条目与第二个条目不重叠,但与第一个条目重叠 5 天,因此在“结束”列下为 5 天。
+-------+-------------------+-------------------+-----+-----+
| id| begin| end| days| over|
+-------+-------------------+-------------------+-----+-----+
|4 |2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
|4 |2019-01-01 00:00:00|2019-11-25 00:00:00|328.0|328.0|
|4 |2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0| 5|
+-------+-------------------+-------------------+-----+-----+
基本上,我想知道特定 id 的有效期有多长。我不能只取最大和最小日期并减去它们,因为期间可能会有中断。
在 R 中,我创建了另一个名为“overlap”的列,并在 for 循环中使用 Overlap 函数来检查所有值与其他值。
产生所需输出的 R 函数:
abc<-data.frame()
for (i in id) {
xyz<- dataset %>% filter(id==i) %>% arrange(begin)
for(j in 1:(nrow(xyz)-1)){
k=j
while(k<nrow(xyz)){
xyz$overlap[j]<- xyz$overlap[j] + Overlap(c(xyz$begin[j], xyz$end[j]), c(xyz$begin[k+1], xyz$end[k+1]))
k=k+1
}
}
abc<- bind_rows(abc,xyz)
}
我仍在学习 pyspark 并需要帮助。
@murtihash 对代码片段的响应
嗨,它看起来更接近答案,但仍然不是我正在寻找的结果。代码的输出
+-------+-------------------+-------------------+-----------------+-------+
| id| begin| end| days|overlap|
+-------+-------------------+-------------------+-----------------+-------+
|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333| 0|
|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00| 98.0| 0|
|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00| 68.0| 1|
|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111| 0|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889| -1|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889| -1|
+-------+-------------------+-------------------+-----------------+-------+
期望的输出应该是:
+-------+-------------------+-------------------+-----------------+-------+
| id| begin| end| days|overlap|
+-------+-------------------+-------------------+-----------------+-------+
|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333| 0|
|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00| 98.0| 0|
|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00| 68.0| 0|
|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111| 0|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|103.082|
|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889| 0|
+-------+-------------------+-------------------+-----------------+-------+
解释:前四行没有重叠。第 5 行和第 6 行是完全相同的时期(并且不与其他行重叠),因此对于第 5 行或第 6 行之一,重叠应为 103.08 天
更新:无法在这种特定情况下工作。代码片段@murtihash 的输出
+-------+-------------------+-------------------+------------------+-------+
| imono| begin| end| days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 0.0|
|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375| 0.0|
|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777| 0.0|
|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 0.0|
|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555| 0.0|
|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221| 0.0|
|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 0.0|
|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0| 7.0|
|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333| 0.0|
+-------+-------------------+-------------------+------------------+-------+
期望的输出:这个
+-------+-------------------+-------------------+------------------+-------+
| imono| begin| end| days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 0.0|
|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375| 0.0|
|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777| 0.0|
|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 0.0|
|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555| 0.0|
|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221| 0.0|
|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 0.0|
|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0| 365|
|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333| 0.0|
+-------+-------------------+-------------------+------------------+-------+
或者
+-------+-------------------+-------------------+------------------+-------+
| imono| begin| end| days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 7.1|
|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375| 27.1|
|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777| 29.5|
|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 48.0|
|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555| 10.4|
|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221| 14.5|
|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.2|
|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0| 0.0|
|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333| 209.1|
+-------+-------------------+-------------------+------------------+-------+
说明:倒数第二个条目跨越全年,所有其他条目都与之重叠。因此,输出要么是倒数第二个条目重叠 = 365,要么是所有其他条目的天数重叠,倒数第二个条目的重叠天数为 0。
Update2:无法在此特定情况下工作。代码片段@murtihash (Update2) 的输出
+-------+-------------------+-------------------+------------------+-------+
| imono| begin| end| days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|9395123|2019-01-19 05:01:00|2019-02-06 00:00:00|17.790972222222223| 17.0|
|9395123|2019-02-06 00:00:00|2019-06-17 00:00:00| 131.0| 0.0|
|9395123|2019-01-19 05:01:00|2020-01-01 00:00:00| 346.7909722222222| 0.0|
|9395123|2019-06-17 00:00:00|2020-01-01 00:00:00| 198.0| 0.0|
+-------+-------------------+-------------------+------------------+-------+
期望的输出:
+-------+-------------------+-------------------+------------------+-------+
| id | begin| end| days|overlap|
+-------+-------------------+-------------------+------------------+-------+
|8888888|2019-01-19 05:01:00|2019-02-06 00:00:00|17.790972222222223| 17.8|
|8888888|2019-02-06 00:00:00|2019-06-17 00:00:00| 131.0| 0.0|
|8888888|2019-01-19 05:01:00|2020-01-01 00:00:00| 346.7909722222222| 329|
|8888888|2019-06-17 00:00:00|2020-01-01 00:00:00| 198.0| 0.0|
+-------+-------------------+-------------------+------------------+-------+
我真的不明白你的代码片段做了什么,因此我无法为我的目的对其进行调整。谢谢你的帮助!
解决方案
对于Spark2.4+
,您可以使用sequence
(生成日期范围),collect_list,
并使用数组函数和高阶函数的组合来获得所需的重叠。
df.show() #sample dataframe
#+---+-------------------+-------------------+-----+
#| id| begin| end| days|
#+---+-------------------+-------------------+-----+
#| 2|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#| 2|2019-12-25 00:00:00|2020-01-01 00:00:00| 7.0|
#| 3|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#| 3|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|
#| 4|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0|
#| 4|2019-01-01 00:00:00|2019-11-25 00:00:00|328.0|
#| 4|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0|
#+---+-------------------+-------------------+-----+
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("id").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
.withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
(partition by id),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
.withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
.otherwise(F.size(F.array_intersect("seq","seq1"))-1)).orderBy("id","end").drop("seq","seq1").show()
#+---+-------------------+-------------------+-----+-------+
#| id| begin| end| days|overlap|
#+---+-------------------+-------------------+-----+-------+
#| 2|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
#| 2|2019-12-25 00:00:00|2020-01-01 00:00:00| 7.0| 0|
#| 3|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
#| 3|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0| 5|
#| 4|2019-01-01 00:00:00|2019-11-25 00:00:00|328.0| 328|
#| 4|2019-01-01 00:00:00|2019-12-25 00:00:00|358.0| 0|
#| 4|2019-12-20 00:00:00|2020-01-01 00:00:00| 12.0| 5|
#+---+-------------------+-------------------+-----+-------+
UPDATE
:
这应该涵盖所有情况:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("id").orderBy("begin")
w2=Window().partitionBy("id","begin","end").orderBy("begin")
w3=Window().partitionBy("id","begin","end")
w4=Window().partitionBy("id","begin","end","maxrownum").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
.withColumn('maxrownum', F.max(F.row_number().over(w2)).over(w3))\
.withColumn('rowNum', F.row_number().over(w4))\
.withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
(partition by id order by begin),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
.withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
.when(F.size(F.array_intersect("seq","seq1"))!=0,F.size(F.array_intersect("seq","seq1"))-1)
.when((F.col("maxrownum")!=1)&(F.col("rowNum")<F.col("maxrownum")),F.col("days"))\
.otherwise(F.lit(0)))\
.orderBy("id","end").drop("seq","seq1","maxrownum","rowNum").show()
#+-------+-------------------+-------------------+-----------------+-----------------+
#| id| begin| end| days| overlap|
#+-------+-------------------+-------------------+-----------------+-----------------+
#|7777777|2019-01-05 01:00:00|2019-04-04 00:00:00|88.95833333333333| 0.0|
#|7777777|2019-04-04 00:00:00|2019-07-11 00:00:00| 98.0| 0.0|
#|7777777|2019-07-11 00:00:00|2019-09-17 00:00:00| 68.0| 0.0|
#|7777777|2019-09-17 00:00:00|2019-09-19 22:01:00|2.917361111111111| 0.0|
#|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889|103.0826388888889|
#|7777777|2019-09-19 22:01:00|2020-01-01 00:00:00|103.0826388888889| 0.0|
#+-------+-------------------+-------------------+-----------------+-----------------+
UPDATE2:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("id").orderBy("begin")
w2=Window().partitionBy("id","begin","end").orderBy("begin")
w3=Window().partitionBy("id","begin","end")
w4=Window().partitionBy("id","begin","end","maxrownum").orderBy("begin")
df.withColumn("seq", F.expr("""sequence(to_timestamp(begin), to_timestamp(end),interval 1 day)"""))\
.withColumn('maxrownum', F.max(F.row_number().over(w2)).over(w3))\
.withColumn('rowNum', F.row_number().over(w4))\
.withColumn("seq1", F.expr("""flatten(filter(collect_list(seq) over\
(partition by id),x-> arrays_overlap(x,seq)==True and seq!=x))"""))\
.withColumn("overlap", F.when(F.row_number().over(w1)==1, F.lit(0))\
.when(F.size(F.array_intersect("seq","seq1"))!=0,F.size(F.array_intersect("seq","seq1"))-1)
.when((F.col("maxrownum")!=1)&(F.col("rowNum")<F.col("maxrownum")),F.col("days"))\
.when(F.col("maxrownum")==1,F.col("days"))\
.otherwise(F.lit(0)))\
.replace(1,0)\
.orderBy("id","end").drop("seq","seq1","rowNum","maxrownum").show()
#+-------+-------------------+-------------------+------------------+------------------+
#| id| begin| end| days| overlap|
#+-------+-------------------+-------------------+------------------+------------------+
#|9347774|2019-01-01 00:00:00|2019-01-08 02:10:00| 7.090277777777778| 7.0|
#|9347774|2019-01-08 02:10:00|2019-02-04 05:28:00| 27.1375| 27.1375|
#|9347774|2019-02-04 05:28:00|2019-03-05 19:29:00|29.584027777777777|29.584027777777777|
#|9347774|2019-03-05 19:29:00|2019-04-22 18:45:00| 47.96944444444444| 47.96944444444444|
#|9347774|2019-04-22 18:45:00|2019-05-03 05:05:00|10.430555555555555|10.430555555555555|
#|9347774|2019-05-03 05:05:00|2019-05-17 16:25:00|14.472222222222221|14.472222222222221|
#|9347774|2019-05-17 16:25:00|2019-06-05 22:18:00| 19.24513888888889| 19.24513888888889|
#|9347774|2019-01-01 00:00:00|2020-01-01 00:00:00| 365.0| 0.0|
#|9347774|2019-06-05 22:18:00|2020-01-01 00:00:00|209.07083333333333|209.07083333333333|
#+-------+-------------------+-------------------+------------------+------------------+
推荐阅读
- c# - 如何在没有 DbContext 的 C# Dotnet Core 应用程序中创建 NpgsqlConnection
- swiftui - SheetView 中 SwiftUI .constant(nil) 绑定的意外行为
- python - 项目euler的ex 94的Python解决方案
- android - Android recyclerView 和数据绑定
- python - 如何创建特定的上三角矩阵?
- c# - EF 6 Code First 中的更新数据库错误
- apache - 代理 Apache2 之后的重定向
- python - 是否有像 Scala 那样的 Pythonic 隐含规定?即用于 FLOP 计数
- python-3.x - 对于这个问题,我可以得到一个概括的方法吗?
- python - 根据python中的字符列表生成字符串变体