pyspark - 在 pyspark 中拆分和爆炸;Py4JJavaError:java.lang.OutOfMemoryError:Java 堆空间
问题描述
我有这个数据集,
root
|-- code: double (nullable = true)
|-- Creator: string (nullable = true)
|-- Created_datetime: timestamp (nullable = true)
|-- Last_modified_datetime: timestamp (nullable = true)
|-- Product_name: string (nullable = true)
|-- Countries_en: string (nullable = true)
|-- Traces_en: string (nullable = true)
|-- Additives_tags: string (nullable = true)
|-- Main_category_en: string (nullable = true)
|-- Image_url: string (nullable = true)
|-- Quantity: string (nullable = true)
|-- Packaging_tags: string (nullable = true)
|-- countries: string (nullable = true)
|-- Categories_en: string (nullable = true)
|-- Ingredients_text: string (nullable = true)
|-- Additives_en: string (nullable = true)
|-- Energy-kcal_100g: double (nullable = true)
|-- Fat_100g: double (nullable = true)
|-- Saturated-fat_100g: double (nullable = true)
|-- Sugars_100g: double (nullable = true)
|-- Salt_100g: double (nullable = true)
|-- sodium_100g: double (nullable = true)
|-- current_time: string (nullable = false)
|-- time_diff_hours: double (nullable = true)
|-- time_diff_days: integer (nullable = true)
|-- time_diff_months: double (nullable = true)
|-- time_diff_years: integer (nullable = true)
我想拆分并分解为“包含按类别列出的产品名称的列表”
这是我试过的代码:
#create df with grouped products
categoriesDF = productsDF\
.select("Categories_en", "Product_name")\
.groupBy("Categories_en")\
.agg(F.collect_set("Product_name").cast("string").alias("Products"))\
.orderBy("Categories_en")
#split df to show countries the product is sold to in a seperate column
categoriesDF\
.where(col("Products")!="null")\
.select("Categories_en",\
F.split("Products", ",").alias("Products"),
F.posexplode(F.split("Products", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
"Categories_en",
F.concat(F.lit("Products"),F.col("pos").cast("string")).alias("name"),
F.expr("Products[pos]").alias("val")
)\
.groupBy("Categories_en").pivot("name").agg(F.first("val"))\
.show()
但我得到这个错误:
Py4JJavaError:调用 o590.agg 时出错。: java.lang.OutOfMemoryError: Java 堆空间
使用 collect 函数我得到了这个:它没有显示实际列表。
Categories_en|collect_set(Product_name)|
+--------------------+-------------------------+
| null| [thon à l'huile d...|
| Acacia honeys| [Miel Acacia Damb...|
|Alimenti-in-scato...| [Formula 1 Mirtillo]|
|Aliments-et-boiss...| [Paprika flavour]|
|Alkoholische-getr...| [Aperol]|
|Alkoholische-getr...| [Pitú Do Brasil (...|
| Baby foods| [Carotte pomme de...|
|Baby foods,Baby m...| [Babylac biscuit,...|
|Baby foods,Baby m...| [NAN A.R. 2]|
|Baby foods,Baby m...| [lait de croissan...|
|Baby foods,Baby m...| [Nan evolia Ha 1]|
|Baby foods,Cereal...| [Babylac]|
|Baby foods,From 1...| [NaturNes Petits ...|
|Baby foods,From 4...| [Cerelac]|
|Baby foods,From 6...| [Hipp Bio Baby Ko...|
|Baby foods,From 8...| [Risotto de Légumes]|
|Baby foods,Main m...| [Ratatouille blé ...|
|Baby foods,Main m...| [NaturNes Mijoté ...|
|Baby foods,Main m...| [Petits Pois Caro...|
|Baby foods,Main m...| [Olvarit bio]
我想查看每个类别的所有产品列表。(使用 pyspark)
解决方案
推荐阅读
- amazon-web-services - 将文件从 AWS S3 复制到 HDFS(Hadoop 分布式文件系统)
- java - 如何在 Lagom 中向 Kafka 主题发布消息
- node.js - 环回 3 是否支持带有跳过 + 限制的 where 过滤器?
- sql - 将单个列中 JSON 的 postgres 数组扩展为多条记录
- python - 如何使用此 python 代码将转换后的文件保存到 Amazon S3 和 Media Convert 中的同一源目录
- javascript - 单击图像时如何播放声音,声音完成后会转到链接?
- python - 如何将多个 Numpy 数组合并为单个数组
- nestjs - 为什么我的 TypeORM 在产品模式下无法连接数据库?
- javascript - O365 中的 Request_BadRequest
- javascript - 在带有 react-native 项目的 Amplify-js 中存在登录问题。TypeError:无法读取未定义的属性“USER_ID_FOR_SRP”