首页 > 解决方案 > PySpark - 从数据框创建多个 json 文件

问题描述

我有以下格式的数据,这些数据是从 Hive 获取到数据框中的:

date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55

其中 date 是当天午夜的纪元,我们的数据可以追溯到 10 年左右(超过 8 亿行)。我的目标是最终得到一堆 JSON 文件,每只股票一个,看起来像:

GOOG.json:
{
'1388534400': 50,
'1388620800': 52
}

FB.json:
{
'1388534400': 60,
'1388620800': 61
}

一种天真的方法是获取唯一股票列表,然后通过仅过滤掉每只股票的那些行来获取数据框的子集,但这似乎过于幼稚且效率极低。这可以在 Spark 中轻松完成吗?我目前已经使用 PyHive 在本机 Python 中工作,但由于数据量巨大,我宁愿在集群/Spark 上完成这项工作。

标签: pythonapache-sparkpyspark

解决方案


是的。这很简单。您可以使用 DataFrameWriter 并使用 partitionBy - 指定要分区的列(在您的情况下它将是库存)

从 Pyspark 文档中:

df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

对你来说,这将是

df.write.partitionBy('stock').json(os.path.join(tempfile.mkdtemp(), 'data'))

注意几点:

  • 这可能需要大量改组,具体取决于 Hive 表的布局方式。
  • 即使在分区之后,根据该分区中有多少记录,每个分区最终可能会有多个文件。例如,30% 的活动可能是针对 GOOG 的,在这种情况下,针对 GOOG 的分区将比其他分区大得多。如果遇到,您只需要为每个分区运行一个文件连接脚本。但是,每个分区中的文件仍将用于单一库存。

推荐阅读