scala - 展平地图spark scala 中的列
问题描述
下面是我的源架构。
root
|-- header: struct (nullable = true)
| |-- timestamp: long (nullable = true)
| |-- id: string (nullable = true)
| |-- honame: string (nullable = true)
|-- device: struct (nullable = true)
| |-- srcId: string (nullable = true)
| |-- srctype.: string (nullable = true)
|-- ATTRIBUTES: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- event_date: date (nullable = true)
|-- event_datetime: string (nullable = true)
我想分解 ATTRIBUTES 映射类型列并选择所有以 _id 结尾的列。
我使用下面的代码。
val exploded = batch_df.select($"event_date", explode($"ATTRIBUTES")).show()
我得到以下示例输出。
---+----------+--------------------+--------------------+
|date | key| value|
+----------+--------------------+--------------------+
|2021-05-18|SYST_id | 85|
|2021-05-18|RECVR_id | 1|
|2021-05-18|Account_Id| | 12345|
|2021-05-18|Vb_id | 845|
|2021-05-18|SYS_INFO_id | 640|
|2021-05-18|mem_id | 456|
------------------------------------------------------
但是,我需要的输出如下。
+---+-------+--------------+-----------+------------+-------+-------------+-------+
|date | SYST_id | RECVR_id | Account_Id | Vb_id | SYS_INFO_id| mem_id|
+----+------+--------------+-----------+------------+-------+-------------+-------+
|2021-05-18| 85 | 1 | 12345 | 845 | 640 | 456 |
+-----------+--------------+-----------+------------+-------+-------------+-------+
有人可以帮忙吗?
解决方案
你的方法有效。您只需要在pivot
之后添加一个操作explode
:
import org.apache.spark.sql.functions._
exploded.groupBy("date").pivot("key").agg(first("value")).show()
我假设date
and的组合key
是唯一的,因此first
在聚合中取(并且唯一的)值是安全的。如果组合不是唯一的,则可以collect_list
用作聚合函数。
编辑:
要添加scrId
and srctype
,只需将这些列添加到select
语句中:
val exploded = batch_df.select($"event_date", $"device.srcId", $"device.srctype", explode($"ATTRIBUTES"))
要减少操作后的列数,请在聚合之前pivot
对列应用过滤器:key
val relevant_cols = Array("Account_Id", "Vb_id", "RECVR_id", "mem_id") // the four additional columns
exploded.filter($"key".isin(relevant_cols:_*).or($"key".endsWith(lit("_split"))))
.groupBy("date").pivot("key").agg(first("value")).show()
推荐阅读
- jenkins - git checkouts 多次,尽管在 Jenkins 管道中只签出一次
- laravel - 如何将简单版本的 Vue 3 安装到 Laravel 8 项目中?
- web - 网站托管服务加载问题
- openiddict - OpenIdDict 在不使用降级模式的情况下扩展现有的程序流程,或者改为扩展现有的默认实现
- python-3.x - 如何在 s3 文件夹上搜索正则表达式匹配,并解析文件
- javascript - Angular 12 更新让我的字体看起来不一样了
- wikipedia - 如何使用 dbpedia 在维基百科上获取公司的结构化数据?
- maven - 迁移到 java 9 模块时无法使用 maven-jar-plugin 构建可执行 jar
- azure - Azure:无法从 ARM 部署 FortiGate - 错误“不支持提供‘未知’支付工具...”
- vb.net - 使用带有 VB.Net 的 swagger 接收 404 响应