apache-spark - 加入两个数据框:丢失的任务
问题描述
我在 PySpark DataFrame 中有时间序列数据。我的每个信号(value
列)都应该被分配一个唯一的id
. 然而,这些id
值是不精确的,需要扩展到双方。原始 DataFrame 如下所示:
df_start
+------+----+-------+
| time | id | value |
+------+----+-------+
| 1| 0| 1.0|
| 2| 1| 2.0|
| 3| 1| 2.0|
| 4| 0| 1.0|
| 5| 0| 0.0|
| 6| 0| 1.0|
| 7| 2| 2.0|
| 8| 2| 3.0|
| 9| 2| 2.0|
| 10| 0| 1.0|
| 11| 0| 0.0|
+------+----+-------+
所需的输出是:
df_desired
+------+----+-------+
| time | id | value |
+------+----+-------+
| 1| 1| 1.0|
| 2| 1| 2.0|
| 3| 1| 2.0|
| 4| 1| 1.0|
| 6| 2| 1.0|
| 7| 2| 2.0|
| 8| 2| 3.0|
| 9| 2| 2.0|
| 10| 2| 1.0|
| 11| 2| 1.0|
+------+----+-------+
所以这里发生了两件事:
- 该
id
列不够精确:每个都id
开始记录 a(此处为 1 和 1)时间步长,并结束 b 时间步长(此处为 1 和 2)至早。因此,我必须用它们各自的 id 替换一些零。 - 在“填充”
id
列中的条目后,我删除所有剩余的行id=0
。(此处仅适用于带有 . 的行time=5
。)
幸运的是,对于每个 ID,我知道相对记录时间延迟是多少。目前,我将其转换为绝对正确的记录时间
df_join
+----+-------+-------+
| id | min_t | max_t |
+----+-------+-------+
| 1| 1| 4|
| 2| 6| 11|
+----+-------+-------+
然后我用它来使用连接“过滤”原始数据
df_desired = df_join.join(df_start,
df_start.time.between(df_join.min_t, df_join.max_t)
)
这会产生所需的输出。
实际上df_join
,至少有 400 000 行,df_start
大约有 100 亿行,其中我们保留最多。
当我在我们的集群上运行它时,我有时会收到类似Lost task, ExecutorLostFailure, Container marked as failed, Exit code: 134
.
我怀疑执行程序内存不足,但是我没有找到任何解决方案。
解决方案
推荐阅读
- wordpress - 如何从innerBlocks中排除父块?
- c - sscanf_s 没有存储正确的模式
- java - 如何使用 JDK 11 java.net.http.HttpClient 上传文件?
- python-3.x - 权限错误,无法在 jupyter 上导入 PIL.image
- mysql - 搜索词有多个值时的Mysql SELECT
- github-api - 使用 Github Graph Api V4 获取存储库贡献者
- php - 过去 30 天“单独”选择 SUM
- docker - 在不受控制的 CI 环境中连接到不安全的本地 docker 注册表
- ssas - 将 sql rank 转换为 mdx rank
- python - 无法使用原始 base64 表单和 python xmlrpc 客户端加载种子文件