python - PySpark:从数据框创建字典的字典?
问题描述
我有以下格式的数据,这些数据是从 Hive 获取到数据框中的:
date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55
其中 date 是当天午夜的纪元,我们的数据可以追溯到 10 年左右(超过 8 亿行)。我想得到一本字典如下:
{
'GOOG':
{
'1388534400': 50,
'1388620800': 52
}
'FB':
{
'1388534400': 60,
'1388620800': 61
}
}
一种天真的方法是获取唯一股票列表,然后通过仅过滤掉每只股票的那些行来获取数据帧的子集,但这似乎过于幼稚且效率极低。这可以在 Spark 中轻松完成吗?我目前已经使用 PyHive 在本机 Python 中工作,但由于数据量巨大,我宁愿在集群/Spark 上完成这项工作。
解决方案
在 spark 2.4 中,您可以map_from_arrays
在汇总每只股票的值时使用来构建日期值映射。那么create_map
使用股票代码作为键只是一个使用问题。此示例使用ChainMap
python 3.4 来构建您所描述的最终 dict 结构。
import json
from collections import ChainMap
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("example") \
.getOrCreate()
df = spark.createDataFrame([
(1388534400, "GOOG", 50),
(1388534400, "FB", 60),
(1388534400, "MSFT", 55),
(1388620800, "GOOG", 52),
(1388620800, "FB", 61),
(1388620800, "MSFT", 55)]
).toDF("date", "stock", "price")
out = df.groupBy("stock") \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select(create_map("stock", "values").alias("values")) \
.rdd.flatMap(lambda x: x) \
.collect()
print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))
这使:
{
"FB": {
"1388534400": 60,
"1388620800": 61
},
"GOOG": {
"1388534400": 50,
"1388620800": 52
},
"MSFT": {
"1388534400": 55,
"1388620800": 55
}
}
但是,正如您所说,您可能实际上不想在内存中创建此字典,因此您最好将其拆分并将相同的字典结构写入不同分区的文件中。
让我们通过将日期截断到给定月份并为每个月和每只股票编写单独的文件来做到这一点:
out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select("month", "stock", create_map("stock", "values").alias("values"))
out.write.partitionBy("month", "stock").format("json").save("out/prices")
这为您提供了如下结构:
out
└── prices
├── _SUCCESS
└── month=2014-01-01
├── stock=FB
│ └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
├── stock=GOOG
│ └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
└── stock=MSFT
└── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
MSFT 文件如下所示:
{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}
虽然“值”列名称可能不在您的字典结构中,但我希望这能说明您可以做什么。
推荐阅读
- apache-spark - 在设定频率后刷新缓存的 Spark Dataframe
- node.js - 在使用 Node.js/Express 进行重定向时,如何防止浏览器将 Authorization 标头发送到新位置?
- php - 为什么当前日期在php中显示当前日期的下一个日期?
- c# - C# LINQ 与 foreach 循环,循环退出后重置值
- javascript - 在 MarkLogic Data Hub Harmonization 标头插件中运行 Xpath 时出错
- mysql - 如何对另一个查询的每个结果进行相同的查询
- jquery - 在jQuery中用''替换管道字符
- batch-file - 批处理脚本:使用 forloops 检查文件夹中是否存在文件
- ios - iOS FCM 推送通知阻塞 UI 线程
- botframework - 为 Bot 框架 Webchat 窗口设置语言