首页 > 解决方案 > 如何在 Pyspark 数据框中查询字典格式列

问题描述

有以下数据框:

  >>> df.printSchema()
  root
   |-- I: string (nullable = true)
   |-- F: string (nullable = true)
   |-- D: string (nullable = true)
   |-- T: string (nullable = true)
   |-- S: string (nullable = true)
   |-- P: string (nullable = true)

F 列采用字典格式:

   {"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04",...}

我需要阅读 F 列如下并创建两个新列 P 和 N

   P1 => "1:0.01"
   P2 => "3:0.03,4:0.04"
   and so on

 +--------+--------+-----------------+-----+------+--------+----+
 | I      |  P     | N               |  D  | T    | S      | P  |
 +--------+--------+---------------- +------------+--------+----+
 | i1     |  p1    | 1:0.01          |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i1     |  p2    | 3:0.03,4:0.04   |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i1     |  p3    | 3:0.03,4:0.04   |  d1 | t1   | s1     | p1 |
 |--------|--------|-----------------|-----|------|--------|----|
 | i2     |  ...   | ....            |  d2 | t2   | s2     | p2 |
 +--------+--------+-----------------+-----+------+--------+----+

Pyspark有什么建议吗?

标签: jsondataframedictionarypyspark

解决方案


试试这个:

  1. 您拥有的 DataFrame
from pyspark.sql import functions as F

df = spark.createDataFrame([('id01', '{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}')], ['I', 'F'])
df.printSchema()
df.show(truncate=False)

您可以在帖子中看到架构和数据相同。

root
 |-- I: string (nullable = true)
 |-- F: string (nullable = true)

+----+---------------------------------------------------------+
|I   |F                                                        |
+----+---------------------------------------------------------+
|id01|{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}|
+----+---------------------------------------------------------+

  1. 处理字符串以区分子字典
# remove '{' and '}'
df = df.withColumn('array', F.regexp_replace('F', r'\{', ''))
df = df.withColumn('array', F.regexp_replace('array', r'\}', ''))

# replace the comma with '#' between each sub-dict so we can split on them
df = df.withColumn('array', F.regexp_replace('array', '","', '"#"' ))
df = df.withColumn('array', F.split('array', '#'))
df.show(truncate=False)

这是中间结果

+----+---------------------------------------------------------+-----------------------------------------------------------+
|I   |F                                                        |array                                                      |
+----+---------------------------------------------------------+-----------------------------------------------------------+
|id01|{"P1":"1:0.01","P2":"3:0.03,4:0.04","P3":"3:0.03,4:0.04"}|["P1":"1:0.01", "P2":"3:0.03,4:0.04", "P3":"3:0.03,4:0.04"]|
+----+---------------------------------------------------------+-----------------------------------------------------------+

  1. Now generate one row for each sub-dict
# generate one row for each element int he array
df = df.withColumn('exploded', F.explode(df['array']))

# Need to distinguish ':' in the dict and in the value
df = df.withColumn('exploded', F.regexp_replace('exploded', '":"', '"#"' ))
df = df.withColumn('exploded', F.split('exploded', '#'))

# extract the name and value
df = df.withColumn('P', F.col('exploded')[0])
df = df.withColumn('N', F.col('exploded')[1])
df.select('I', 'exploded', 'P', 'N').show(truncate=False)

The final output:

+----+-----------------------+----+---------------+
|I   |exploded               |P   |N              |
+----+-----------------------+----+---------------+
|id01|["P1", "1:0.01"]       |"P1"|"1:0.01"       |
|id01|["P2", "3:0.03,4:0.04"]|"P2"|"3:0.03,4:0.04"|
|id01|["P3", "3:0.03,4:0.04"]|"P3"|"3:0.03,4:0.04"|
+----+-----------------------+----+---------------+

推荐阅读