首页 > 解决方案 > 有没有加入许多(1000+)DF 的有效方法?

问题描述

我对 Spark 还很陌生,我有一个 DF 可以将 8000 个时间序列堆叠在一起。这些时间序列作为单独的列更有意义,所以我决定一一过滤它们并将它们加入新的主 DF 中的时间戳。

到目前为止一切顺利,但是一旦我尝试得到OOM 错误

# My stacked DF and the name of the columns
stocks_df.cache()
tickers = ["TS1", "TS2", "TS3", ...]

def filter_for_symbol(ticker):
    security = stocks_df.filter(stocks_df["Symbol"] == ticker).drop("Symbol")
    # Avoids conflict in column names...
    # for f in features:
    #     security = security.withColumnRenamed(f, ticker + "_" + f)
    return security

# Aggregation Loop
acc_df = filter_for_symbol(tickers[0])
for ticker in tickers[1:]:
    security = filter_for_symbol(ticker)
    acc_df = acc_df.join(security, on="Date", how="outer")

在速度和/或内存管理方面做我正在做的事情的更好方法是什么?


更新

我所拥有的,stocks_df: dim = (8000 time series * 3125 entries) x 9 features

+-------------+--------+------+-----+------+
|    Date     | Symbol | Open | ... | Close|
+-------------+--------+------+-----+------+
| 2020/01/01  |  TS1   | 10.0 | ... | 12.2 |
| 2020/01/02  |  TS1   | 12.2 | ... | 13.2 |
|    ...      |  ...   | ...  | ... | ...  |
+-------------+--------+------+-----+------+
| 2020/01/01  |  TS2   | 29.9 | ... | 32.2 |
| 2020/01/02  |  TS2   | 32.2 | ... | 19.2 |
|    ...      |  ...   | ...  | ... | ...  |
+-------------+--------+------+-----+------+
|    ...      |  ...   | ...  | ... | ...  |
+-------------+--------+------+-----+------+

我想得到什么:dim = 3125 x (8000 * 9)

+-------------+----------+-------+-----------+-----------+--------+
|    Date     | TS1_Open :  ...  : TS1_Close | TS2_Close |  TS2_..|
+-------------+----------+-------+-----------+-----------+--------+
| 2020/01/01  |   10.0   :  ...  :    12.2   |    12.2   :  ...   |
| 2020/01/02  |   12.2   :  ...  :    13.2   |    13.2   :  ...   |
|    ...      |   ...    :  ...  :    ...    |    ...    :  ...   |
+-------------+----------+-------+-----------+-----------+--------+

我得到的是:java.lang.OOM

标签: pyspark

解决方案


只是pivot你的数据框呢?

df.show(10, False)

+----------+-----+----+-----+
|Date      |ymbol|Open|Close|
+----------+-----+----+-----+
|2020/01/01|TS1  |10.0|12.2 |
|2020/01/02|TS1  |12.2|13.2 |
|2020/01/01|TS2  |29.9|32.2 |
|2020/01/02|TS2  |32.2|19.2 |
+----------+-----+----+-----+

cols = df.columns[2:]
tickers = ['TS1', 'TS2']

import pyspark.sql.functions as f

df.groupBy('Date') \
  .pivot('ymbol', tickers) \
  .agg(*map(lambda x: f.sum(f.col(x)).alias(x), cols)) \
  .show(10, False)

+----------+--------+---------+--------+---------+
|Date      |TS1_Open|TS1_Close|TS2_Open|TS2_Close|
+----------+--------+---------+--------+---------+
|2020/01/01|10.0    |12.2     |29.9    |32.2     |
|2020/01/02|12.2    |13.2     |32.2    |19.2     |
+----------+--------+---------+--------+---------+

推荐阅读