首页 > 解决方案 > 加入两个数据框:丢失的任务

问题描述

我在 PySpark DataFrame 中有时间序列数据。我的每个信号(value列)都应该被分配一个唯一的id. 然而,这些id值是不精确的,需要扩展到双方。原始 DataFrame 如下所示:

df_start
+------+----+-------+
| time | id | value |
+------+----+-------+
|     1|   0|    1.0|
|     2|   1|    2.0|
|     3|   1|    2.0|
|     4|   0|    1.0|
|     5|   0|    0.0|
|     6|   0|    1.0|
|     7|   2|    2.0|
|     8|   2|    3.0|
|     9|   2|    2.0|
|    10|   0|    1.0|
|    11|   0|    0.0|
+------+----+-------+

所需的输出是:

df_desired
+------+----+-------+
| time | id | value |
+------+----+-------+
|     1|   1|    1.0|
|     2|   1|    2.0|
|     3|   1|    2.0|
|     4|   1|    1.0|
|     6|   2|    1.0|
|     7|   2|    2.0|
|     8|   2|    3.0|
|     9|   2|    2.0|
|    10|   2|    1.0|
|    11|   2|    1.0|
+------+----+-------+

所以这里发生了两件事:

幸运的是,对于每个 ID,我知道相对记录时间延迟是多少。目前,我将其转换为绝对正确的记录时间

df_join
+----+-------+-------+
| id | min_t | max_t |
+----+-------+-------+
|   1|      1|      4|
|   2|      6|     11|
+----+-------+-------+

然后我用它来使用连接“过滤”原始数据

df_desired = df_join.join(df_start, 
                          df_start.time.between(df_join.min_t, df_join.max_t)
                         )

这会产生所需的输出。

实际上df_join,至少有 400 000 行,df_start大约有 100 亿行,其中我们保留最多。

当我在我们的集群上运行它时,我有时会收到类似Lost task, ExecutorLostFailure, Container marked as failed, Exit code: 134.

我怀疑执行程序内存不足,但是我没有找到任何解决方案。

标签: apache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


推荐阅读