首页 > 解决方案 > 数组列中的Scala DataFrame过滤器值

问题描述

我有一个countriesDF 带有架构的 DataFrame

 root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- area: double (nullable = true)
 |-- capital: string (nullable = true)
 |-- code: string (nullable = true)
 |-- gdp: double (nullable = true)
 |-- government: string (nullable = true)
 |-- independence: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- inflation: double (nullable = true)
 |-- name: string (nullable = true)
 |-- population: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- value: long (nullable = true)
 |    |    |-- year: long (nullable = true)
 |-- unemployment: double (nullable = true)

从 json 文件中导入

val countriesDF = spark.read.json("countries.json").cache()

国家.json:

{"_id":{"$oid":"581cb5a519ec2deb4ba71b3d"},"name":"France","code":"F","capital":"F-Île-de-France-Paris","area":547030,"gdp":2739000,"inflation":0.9,"unemployment":10.2,"independence":null,"government":"republic","population":[{"year":1946,"value":40502513},{"year":1954,"value":42777162},{"year":1962,"value":46520271},{"year":1968,"value":49778540},{"year":1975,"value":52591584},{"year":1982,"value":54334871},{"year":1990,"value":56615155},{"year":1999,"value":60185831},{"year":2006,"value":60825000},{"year":2011,"value":64933400}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b40"},"name":"Spain","code":"E","capital":"E-Madrid-Madrid","area":504750,"gdp":1356000,"inflation":1.8,"unemployment":26.3,"independence":{"$date":"1492-01-01T00:00:00.000Z"},"government":"parliamentary monarchy","population":[{"year":1900,"value":18618086},{"year":1910,"value":19995686},{"year":1920,"value":21389842},{"year":1930,"value":23677794},{"year":1940,"value":25877971},{"year":1950,"value":27976755},{"year":1960,"value":30528539},{"year":1970,"value":34040657},{"year":1981,"value":37683363},{"year":1991,"value":38871331},{"year":2001,"value":40847371},{"year":2011,"value":46815916}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b41"},"name":"Germany","code":"D","capital":"D-Berlin-Berlin","area":356910,"gdp":3593000,"inflation":1.6,"unemployment":5.3,"independence":{"$date":"1871-01-18T00:00:00.000Z"},"government":"federal republic","population":[{"year":1950,"value":68230796},{"year":1997,"value":82501000},{"year":2007,"value":82217837},{"year":2011,"value":80219695}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b42"},"name":"Austria","code":"A","capital":"A-Wien-Wien","area":83850,"gdp":417900,"inflation":2.1,"unemployment":4.9,"independence":{"$date":"1918-11-12T00:00:00.000Z"},"government":"federal republic","population":[{"year":1869,"value":4497873},{"year":1910,"value":6648310},{"year":1934,"value":6760233},{"year":1951,"value":6933905},{"year":1961,"value":7073807},{"year":1971,"value":7491526},{"year":1981,"value":7555338},{"year":1991,"value":7795786},{"year":2001,"value":8032926},{"year":2010,"value":8375290},{"year":2013,"value":8499759}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b43"},"name":"Czech Republic","code":"CZ","capital":"CZ-Praha-Praha","area":78703,"gdp":194800,"inflation":1.4,"unemployment":7.1,"independence":{"$date":"1993-01-01T00:00:00.000Z"},"government":"parliamentary democracy","population":[{"year":1950,"value":8876260},{"year":1960,"value":9577974},{"year":1970,"value":9827923},{"year":1980,"value":10313309},{"year":1990,"value":10325976},{"year":1997,"value":10321120},{"year":2000,"value":10250398},{"year":2001,"value":10230060},{"year":2011,"value":10562214}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b47"},"name":"Liechtenstein","code":"FL","capital":"FL-Liechtenstein-Vaduz","area":160,"gdp":5113,"inflation":-0.7,"unemployment":2.3,"independence":{"$date":"1806-07-12T00:00:00.000Z"},"government":"hereditary constitutional monarchy","population":[{"year":1950,"value":13757},{"year":1960,"value":16628},{"year":1970,"value":21350},{"year":1980,"value":25215},{"year":1990,"value":29032},{"year":2001,"value":33525},{"year":2006,"value":35174},{"year":2012,"value":36636}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b48"},"name":"Italy","code":"I","capital":"I-Lazio-Roma","area":301230,"gdp":2068000,"inflation":1.2,"unemployment":12.4,"independence":{"$date":"1861-03-17T00:00:00.000Z"},"government":"republic","population":[{"year":1861,"value":22182377},{"year":1871,"value":27303509},{"year":1881,"value":28953480},{"year":1901,"value":32965504},{"year":1911,"value":35845048},{"year":1921,"value":39943528},{"year":1931,"value":41651000},{"year":1936,"value":42943602},{"year":1951,"value":47515537},{"year":1961,"value":50623569},{"year":1971,"value":54136547},{"year":1981,"value":56556911},{"year":1991,"value":56778031},{"year":2001,"value":56995744},{"year":2011,"value":59433744}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b49"},"name":"Slovenia","code":"SLO","capital":"SLO-Slovenia-Ljubljana","area":20256,"gdp":46820,"inflation":1.8,"unemployment":13.1,"independence":{"$date":"1991-06-25T00:00:00.000Z"},"government":"parliamentary republic","population":[{"year":1857,"value":1101854},{"year":1869,"value":1128768},{"year":1880,"value":1182223},{"year":1890,"value":1234056},{"year":1900,"value":1268055},{"year":1910,"value":1321098},{"year":1921,"value":1054919},{"year":1931,"value":1144298},{"year":1948,"value":1391873},{"year":1953,"value":1466425},{"year":1961,"value":1591523},{"year":1971,"value":1727137},{"year":1981,"value":1891864},{"year":1991,"value":1913355},{"year":2002,"value":1964036},{"year":2010,"value":2046976},{"year":2011,"value":2050189},{"year":2013,"value":2058821}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b4e"},"name":"Ukraine","code":"UA","capital":"UA-Kyïv-Kyïv","area":603700,"gdp":175500,"inflation":0.7,"unemployment":8,"independence":{"$date":"1991-12-01T00:00:00.000Z"},"government":"republic","population":[{"year":1950,"value":37297652},{"year":1959,"value":41869046},{"year":1970,"value":47126517},{"year":1979,"value":49609333},{"year":1989,"value":51706742},{"year":2001,"value":48457102},{"year":2011,"value":45778534},{"year":2012,"value":45633637}]}
{"_id":{"$oid":"581cb5a519ec2deb4ba71b4f"},"name":"Russia","code":"R","capital":"R-Moscow-Moskva","area":17075200,"gdp":2113000,"inflation":6.8,"unemployment":5.8,"independence":{"$date":"1991-08-24T00:00:00.000Z"},"government":"federation","population":[{"year":1950,"value":102798657},{"year":1960,"value":120057109},{"year":1970,"value":130357806},{"year":1980,"value":138535754},{"year":1990,"value":148148752},{"year":1997,"value":148178487},{"year":2000,"value":146762881},{"year":2010,"value":142856536},{"year":2013,"value":143347059},{"year":2014,"value":143666931}]}

我需要从 2010 年开始为每个国家/地区获取人口。

countriesDF.
select(col("name"), 
       expr("filter(population, x -> x.year == 2010)").as("pop")).
show()

这给了我一个结果

+--------------+-------------------+
|          name|                pop|
+--------------+-------------------+
|        France|                 []|
|         Spain|                 []|
|       Germany|                 []|
|       Austria|  [[8375290, 2010]]|
|Czech Republic|                 []|
| Liechtenstein|                 []|
|         Italy|                 []|
|      Slovenia|  [[2046976, 2010]]|
|       Ukraine|                 []|
|        Russia|[[142856536, 2010]]|
+--------------+-------------------+

对于没有 2010 年记录的国家/地区,我需要获取该日期之前的最新记录。我该怎么做?结果应如下所示:

+--------------+-------------------+
|          name|                pop|
+--------------+-------------------+
|        France|   [60825000, 2006]|
|         Spain|   [40847371, 2001]|
|       Germany|   [82217837, 2007]|
|       Austria|    [8375290, 2010]|
|Czech Republic|   [10230060, 2001]|
| Liechtenstein|      [35174, 2006]|
|         Italy|   [56995744, 2001]|
|      Slovenia|    [2046976, 2010]|
|       Ukraine|   [48457102, 2001]|
|        Russia|  [142856536, 2010]|
+--------------+-------------------+

标签: scalaapache-sparkapache-spark-sql

解决方案


首先将数据框分解为每年一行的格式

val exploded = countriesDF
  .select($"name", explode($"population") as "pop")
  .select($"name", $"pop.*")

然后一个窗口函数就可以了

exploded.filter($"year" <= 2010)
  .withColumn("row",
    row_number() over Window.partitionBy($"name").orderBy(desc("year")) )
  .filter($"row" === 1)
  .drop("row")
  .show()

输出:

+--------------+---------+----+
|          name|    value|year|
+--------------+---------+----+
|        Russia|142856536|2010|
|       Germany| 82217837|2007|
|        France| 60825000|2006|
|         Italy| 56995744|2001|
|         Spain| 40847371|2001|
| Liechtenstein|    35174|2006|
|       Ukraine| 48457102|2001|
|Czech Republic| 10230060|2001|
|      Slovenia|  2046976|2010|
|       Austria|  8375290|2010|
+--------------+---------+----+

如果您想要在单个列中包含年份和人口的原始格式,请添加.select($"name", struct($"value", $"year") as "pop")

+--------------+-----------------+
|          name|              pop|
+--------------+-----------------+
|        Russia|[142856536, 2010]|
|       Germany| [82217837, 2007]|
|        France| [60825000, 2006]|
|         Italy| [56995744, 2001]|
|         Spain| [40847371, 2001]|
| Liechtenstein|    [35174, 2006]|
|       Ukraine| [48457102, 2001]|
|Czech Republic| [10230060, 2001]|
|      Slovenia|  [2046976, 2010]|
|       Austria|  [8375290, 2010]|
+--------------+-----------------+

编辑:或者,这很容易实现为 UDF

val popudf = udf((_: Seq[Row])
  .collect {
    case Row(pop: Long, year: Long) if year <= 2010 => (pop, year)
  }
  .maxBy(_._2))
countriesDF
  .select($"name", popudf($"population") as "pop")
  .show()

产生相同的输出

+--------------+-----------------+
|          name|              pop|
+--------------+-----------------+
|        France| [60825000, 2006]|
|         Spain| [40847371, 2001]|
|       Germany| [82217837, 2007]|
|       Austria|  [8375290, 2010]|
|Czech Republic| [10230060, 2001]|
| Liechtenstein|    [35174, 2006]|
|         Italy| [56995744, 2001]|
|      Slovenia|  [2046976, 2010]|
|       Ukraine| [48457102, 2001]|
|        Russia|[142856536, 2010]|
+--------------+-----------------+

推荐阅读