首页 > 解决方案 > 在 PySpark 中,如何使用从一列派生的值从另一列获取数据?

问题描述

我必须从其中一列中派生一个值(来自 JSON 文件,该值是一个数组数组),并使用派生值作为键来选择包含 JSON 数据的下一列。

例子:

考虑我的记录中的以下两个元素:

categories: [["Movies & TV", "Movies"]]
salesRank: {
  "Appliances": null, "Arts_ Crafts & Sewing": null, 
  "Automotive": null, "Baby": null, "Beauty": null, 
  "Books": null, "Camera & Photo": null, 
  "Cell Phones & Accessories": null, "Clothing": null, 
  "Computers & Accessories": null, "Electronics": null,
  "Gift Cards Store": null, "Grocery & Gourmet Food": null, 
  "Health & Personal Care": null, "Home & Kitchen": null, 
  "Home Improvement": null, "Industrial & Scientific": null, 
  "Jewelry": null, "Kitchen & Dining": null, "Magazines": null, 
  "Movies & TV": 1084845, "Music": null, "Musical Instruments": null, 
  "Office Products": null, "Patio_ Lawn & Garden": null, 
  "Pet Supplies": null, "Prime Pantry": null, "Shoes": null, 
  "Software": null, "Sports & Outdoors": null, "Toys & Games": null, 
  "Video Games": null, "Watches": null}

我正在为我的记录选择类别(插入 rdbms)col('categories').getItem(0).getItem(0).alias("categories")

这会给我“电影和电视”

现在我需要得到salesRank. 这应该是“Movies & TV”的 salesRank 的值。我怎样才能得到这个?

标签: pysparkpyspark-dataframes

解决方案


我将尝试复制您的数据框。

df = spark.createDataFrame(
     [
         (1, '{"a": true, "b":98, "c":"xxx1"}', [['a','b']]),
         (2, '{"a": false, "b":98, "c":"xxx2"}', [['c','a']]),
         (3, '{"a": true, "b":99, "c":"xxx3"}', [['b','c']]),
     ],
     schema=['id', 'salesRank', 'categories'],
 )
df.show()

+---+--------------------+----------+
| id|           salesRank|categories|
+---+--------------------+----------+
|  1|{"a": true, "b":9...|  [[a, b]]|
|  2|{"a": false, "b":...|  [[c, a]]|
|  3|{"a": true, "b":9...|  [[b, c]]|
+---+--------------------+----------+

让我们解析你的salesRank列,我会将每个键作为数据框的一列,并让你的categories.

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('a', StringType(), True), # define the data type of values 
        StructField('b', DoubleType(), True), # of your json here, means `a`
        StructField('c', StringType(), True)  # holds stringType & so on for b,c

    ]
)



df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
    .select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
 F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key')).show()


+---+--------------------+-----+----+----+----------+---+
| id|           salesRank|    a|   b|   c|categories|key|
+---+--------------------+-----+----+----+----------+---+
|  1|{"a": true, "b":9...| true|98.0|xxx1|  [[a, b]]|  a|
|  2|{"a": false, "b":...|false|98.0|xxx2|  [[c, a]]|  c|
|  3|{"a": true, "b":9...| true|99.0|xxx3|  [[b, c]]|  b|
+---+--------------------+-----+----+----+----------+---+

因此,我已将您的 json 键转换为列并提取了您的类别的第一个元素。现在我们只需要为列中存在的key 列添加值。

为此,我参考了这个SO answer

from pyspark.sql.functions import col, lit, when
from functools import reduce

data_cols = ['a','b','c'] #all keys of your json dict 

out = reduce(
    lambda acc, x: when(col("key") == x, col(x)).otherwise(acc), 
    data_cols,
    lit(None)
)

df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
    .select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
 F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key'))\
.withColumn('value',out).show()

最终输出如下所示,列中所需的 json 键的所有值value

+---+--------------------+-----+----+----+----------+---+-----+
| id|           salesRank|    a|   b|   c|categories|key|value|
+---+--------------------+-----+----+----+----------+---+-----+
|  1|{"a": true, "b":9...| true|98.0|xxx1|  [[a, b]]|  a| true|
|  2|{"a": false, "b":...|false|98.0|xxx2|  [[c, a]]|  c| xxx2|
|  3|{"a": true, "b":9...| true|99.0|xxx3|  [[b, c]]|  b| 99.0|
+---+--------------------+-----+----+----+----------+---+-----+

编辑

原来有一种方法可以使这个模式独立。有一个功能get_json_object 可以解决这个问题。

from pyspark.sql.functions import get_json_object

out2 = reduce(
    lambda acc, x: when(col("key") == x, get_json_object(F.col('salesRank'), f"$.{x}").alias(f"{x}")).otherwise(acc), 
    data_cols,
    lit(None)
)


df.select(F.col('id'), F.col('salesRank'), F.col('categories'),
  (F.col('categories').getItem(0).getItem(0)).alias('key'))\
              .withColumn('value',out2).show()


+---+--------------------+----------+---+-----+
| id|           salesRank|categories|key|value|
+---+--------------------+----------+---+-----+
|  1|{"a": true, "b":9...|  [[a, b]]|  a| true|
|  2|{"a": false, "b":...|  [[c, a]]|  c| xxx2|
|  3|{"a": true, "b":9...|  [[b, c]]|  b|   99|
+---+--------------------+----------+---+-----+

推荐阅读