首页 > 解决方案 > 展平地图spark scala 中的列

问题描述

下面是我的源架构。

root
 |-- header: struct (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- honame: string (nullable = true)
 |-- device: struct (nullable = true)
 |    |-- srcId: string (nullable = true)
 |    |-- srctype.: string (nullable = true)
 |-- ATTRIBUTES: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- event_date: date (nullable = true)
 |-- event_datetime: string (nullable = true)

我想分解 ATTRIBUTES 映射类型列并选择所有以 _id 结尾的列。

我使用下面的代码。

val exploded = batch_df.select($"event_date", explode($"ATTRIBUTES")).show()

我得到以下示例输出。

---+----------+--------------------+--------------------+
|date       |                 key|               value|
+----------+--------------------+--------------------+
|2021-05-18|SYST_id             |                  85|
|2021-05-18|RECVR_id            |                   1|
|2021-05-18|Account_Id|         |               12345|
|2021-05-18|Vb_id               |                 845|
|2021-05-18|SYS_INFO_id         |                 640|
|2021-05-18|mem_id              |                 456|
------------------------------------------------------

但是,我需要的输出如下。

+---+-------+--------------+-----------+------------+-------+-------------+-------+
|date       | SYST_id      |  RECVR_id | Account_Id | Vb_id |  SYS_INFO_id| mem_id|
+----+------+--------------+-----------+------------+-------+-------------+-------+
|2021-05-18|  85           |  1        |   12345    |  845  |     640     | 456   |
+-----------+--------------+-----------+------------+-------+-------------+-------+

有人可以帮忙吗?

标签: scalaapache-spark

解决方案


你的方法有效。您只需要在pivot之后添加一个操作explode

import org.apache.spark.sql.functions._

exploded.groupBy("date").pivot("key").agg(first("value")).show()

我假设dateand的组合key是唯一的,因此first在聚合中取(并且唯一的)值是安全的。如果组合不是唯一的,则可以collect_list用作聚合函数。


编辑:

要添加scrIdand srctype,只需将这些列添加到select语句中:

val exploded = batch_df.select($"event_date", $"device.srcId", $"device.srctype", explode($"ATTRIBUTES"))

要减少操作后的列数,请在聚合之前pivot对列应用过滤器:key

val relevant_cols = Array("Account_Id", "Vb_id", "RECVR_id", "mem_id") // the four additional columns

exploded.filter($"key".isin(relevant_cols:_*).or($"key".endsWith(lit("_split"))))
    .groupBy("date").pivot("key").agg(first("value")).show()

推荐阅读