首页 > 解决方案 > 在 pyspark 中拆分和爆炸;Py4JJavaError:java.lang.OutOfMemoryError:Java 堆空间

问题描述

我有这个数据集,

root
|-- code: double (nullable = true)
|-- Creator: string (nullable = true)
|-- Created_datetime: timestamp (nullable = true)
|-- Last_modified_datetime: timestamp (nullable = true)
|-- Product_name: string (nullable = true)
|-- Countries_en: string (nullable = true)
|-- Traces_en: string (nullable = true)
|-- Additives_tags: string (nullable = true)
|-- Main_category_en: string (nullable = true)
|-- Image_url: string (nullable = true)
|-- Quantity: string (nullable = true)
|-- Packaging_tags: string (nullable = true)
|-- countries: string (nullable = true)
|-- Categories_en: string (nullable = true)
|-- Ingredients_text: string (nullable = true)
|-- Additives_en: string (nullable = true)
|-- Energy-kcal_100g: double (nullable = true)
|-- Fat_100g: double (nullable = true)
|-- Saturated-fat_100g: double (nullable = true)
|-- Sugars_100g: double (nullable = true)
|-- Salt_100g: double (nullable = true)
|-- sodium_100g: double (nullable = true)
|-- current_time: string (nullable = false)
|-- time_diff_hours: double (nullable = true)
|-- time_diff_days: integer (nullable = true)
|-- time_diff_months: double (nullable = true)
|-- time_diff_years: integer (nullable = true) 

我想拆分并分解为“包含按类别列出的产品名称的列表”

这是我试过的代码:

#create df with grouped products
categoriesDF = productsDF\
.select("Categories_en", "Product_name")\
.groupBy("Categories_en")\
.agg(F.collect_set("Product_name").cast("string").alias("Products"))\
.orderBy("Categories_en")

#split df to show countries the product is sold to in a seperate column
categoriesDF\
   .where(col("Products")!="null")\
.select("Categories_en",\
        F.split("Products", ",").alias("Products"),
        F.posexplode(F.split("Products", ",")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "Categories_en",
        F.concat(F.lit("Products"),F.col("pos").cast("string")).alias("name"),
        F.expr("Products[pos]").alias("val")
    )\
    .groupBy("Categories_en").pivot("name").agg(F.first("val"))\
    .show()

但我得到这个错误:

Py4JJavaError:调用 o590.agg 时出错。: java.lang.OutOfMemoryError: Java 堆空间

使用 collect 函数我得到了这个:它没有显示实际列表。

 Categories_en|collect_set(Product_name)|
+--------------------+-------------------------+
|                null|     [thon à l'huile d...|
|       Acacia honeys|     [Miel Acacia Damb...|
|Alimenti-in-scato...|     [Formula 1 Mirtillo]|
|Aliments-et-boiss...|        [Paprika flavour]|
|Alkoholische-getr...|                 [Aperol]|
|Alkoholische-getr...|     [Pitú Do Brasil (...|
|          Baby foods|     [Carotte pomme de...|
|Baby foods,Baby m...|     [Babylac biscuit,...|
|Baby foods,Baby m...|             [NAN A.R. 2]|
|Baby foods,Baby m...|     [lait de croissan...|
|Baby foods,Baby m...|        [Nan evolia Ha 1]|
|Baby foods,Cereal...|                [Babylac]|
|Baby foods,From 1...|     [NaturNes Petits ...|
|Baby foods,From 4...|                [Cerelac]|
|Baby foods,From 6...|     [Hipp Bio Baby Ko...|
|Baby foods,From 8...|     [Risotto de Légumes]|
|Baby foods,Main m...|     [Ratatouille blé ...|
|Baby foods,Main m...|     [NaturNes Mijoté ...|
|Baby foods,Main m...|     [Petits Pois Caro...|
|Baby foods,Main m...|            [Olvarit bio]

我想查看每个类别的所有产品列表。(使用 pyspark)

标签: pysparkapache-spark-sql

解决方案


推荐阅读