pyspark - 有没有加入许多(1000+)DF 的有效方法?
问题描述
我对 Spark 还很陌生,我有一个 DF 可以将 8000 个时间序列堆叠在一起。这些时间序列作为单独的列更有意义,所以我决定一一过滤它们并将它们加入新的主 DF 中的时间戳。
到目前为止一切顺利,但是一旦我尝试得到OOM 错误。
# My stacked DF and the name of the columns
stocks_df.cache()
tickers = ["TS1", "TS2", "TS3", ...]
def filter_for_symbol(ticker):
security = stocks_df.filter(stocks_df["Symbol"] == ticker).drop("Symbol")
# Avoids conflict in column names...
# for f in features:
# security = security.withColumnRenamed(f, ticker + "_" + f)
return security
# Aggregation Loop
acc_df = filter_for_symbol(tickers[0])
for ticker in tickers[1:]:
security = filter_for_symbol(ticker)
acc_df = acc_df.join(security, on="Date", how="outer")
在速度和/或内存管理方面做我正在做的事情的更好方法是什么?
更新
我所拥有的,stocks_df: dim = (8000 time series * 3125 entries) x 9 features
+-------------+--------+------+-----+------+
| Date | Symbol | Open | ... | Close|
+-------------+--------+------+-----+------+
| 2020/01/01 | TS1 | 10.0 | ... | 12.2 |
| 2020/01/02 | TS1 | 12.2 | ... | 13.2 |
| ... | ... | ... | ... | ... |
+-------------+--------+------+-----+------+
| 2020/01/01 | TS2 | 29.9 | ... | 32.2 |
| 2020/01/02 | TS2 | 32.2 | ... | 19.2 |
| ... | ... | ... | ... | ... |
+-------------+--------+------+-----+------+
| ... | ... | ... | ... | ... |
+-------------+--------+------+-----+------+
我想得到什么:dim = 3125 x (8000 * 9)
+-------------+----------+-------+-----------+-----------+--------+
| Date | TS1_Open : ... : TS1_Close | TS2_Close | TS2_..|
+-------------+----------+-------+-----------+-----------+--------+
| 2020/01/01 | 10.0 : ... : 12.2 | 12.2 : ... |
| 2020/01/02 | 12.2 : ... : 13.2 | 13.2 : ... |
| ... | ... : ... : ... | ... : ... |
+-------------+----------+-------+-----------+-----------+--------+
我得到的是:java.lang.OOM
解决方案
只是pivot
你的数据框呢?
df.show(10, False)
+----------+-----+----+-----+
|Date |ymbol|Open|Close|
+----------+-----+----+-----+
|2020/01/01|TS1 |10.0|12.2 |
|2020/01/02|TS1 |12.2|13.2 |
|2020/01/01|TS2 |29.9|32.2 |
|2020/01/02|TS2 |32.2|19.2 |
+----------+-----+----+-----+
cols = df.columns[2:]
tickers = ['TS1', 'TS2']
import pyspark.sql.functions as f
df.groupBy('Date') \
.pivot('ymbol', tickers) \
.agg(*map(lambda x: f.sum(f.col(x)).alias(x), cols)) \
.show(10, False)
+----------+--------+---------+--------+---------+
|Date |TS1_Open|TS1_Close|TS2_Open|TS2_Close|
+----------+--------+---------+--------+---------+
|2020/01/01|10.0 |12.2 |29.9 |32.2 |
|2020/01/02|12.2 |13.2 |32.2 |19.2 |
+----------+--------+---------+--------+---------+
推荐阅读
- video - 具有 H.264 的 mpeg-4 容器所需的最小 Atom/Box 集(一个流,渐进式视频,无音频)
- css - 您如何为元素设置动画以使其从页面顶部滑入。元素在带有网格的页面中居中
- certificate - 如何或在哪里下载我的 AWS IoT 根 CA?
- r - 如何在单变量动物园系列中添加一列?
- linux - 如何使用 shell 脚本创建 .log 文件序列?
- javascript - 为什么 Javascript Intellisense 在某些情况下有效,而在其他情况下无效?
- http - NoSuchMethodError:类 'Text' 在 Flutter 中没有实例方法 'split'
- mysql - SQL - 选择包含至少一个 NULL 值的列?
- python - 尝试编写脚本,将下载文件夹内的所有子文件夹
- flutter - Flutter:local_auth 包在“取消”选项时返回 false