apache-spark - spark从变量字段中为withColumn函数分配列名
问题描述
我有一些像下面这样的 json 数据,我需要根据一些 Jason 值创建新列
{ "start": "1234567679", "test": ["abc"], "value": 324, "end": "1234567689" }
{ "start": "1234567679", "test": ["xyz"], "value": "Near", "end": "1234567689"}
{ "start": "1234568679", "test": ["pqr"], "value": ["Attr"," "], "end":"1234568679"}
{ "start": "1234568997", "test": ["mno"], "value": ["{\"key\": \"1\", \"value\": [\"789\"]}" ], "end": "1234568999"}
以上是json示例
我想创建一个如下所示的列
start abc xyz pqr mno end
1234567679 324 null null null 1234567689
1234567889 null Near null null 1234567989
1234568679 null null attr null 1234568679
1234568997 null null null 789 1234568999
def getValue1(s1: Seq[String], v: String) = {
if (s1(0)=="abc")) v else null
}
def getValue2(s1: Seq[String], v: String) = {
if (s1(0)=="xyz")) v else null
}
val df = spark.read.json("path to json")
val tdf = df.withColumn("abc",getValue1($"test", $"value")).withColumn("xyz",getValue2($"test", $"value"))
但是我不想使用这个,因为我的测试值更多,我想要一些函数做这样的事情
def getColumnname(s1: Seq[String]) = {
return s1(0)
}
val tdf = df.withColumn(getColumnname($"test"),$"value"))
将值更改为列是否是个好主意,我想要这样,因为我需要将它应用于一些需要普通列的机器学习代码
解决方案
您可以使用枢轴操作来执行此类操作。假设您的数组中始终有一项作为test
列,这是更简单的解决方案;
import org.apache.spark.sql.functions._
val df = sqlContext.read.json("<yourPath>")
df.withColumn("test", $"test".getItem(0)).groupBy($"start", $"end").pivot("test").agg(first("value")).show
+----------+----------+----+----+
| start| end| abc| xyz|
+----------+----------+----+----+
|1234567679|1234567689| 324|null|
|1234567889|1234567689|null| 789|
+----------+----------+----+----+
如果test
列中有多个值,也可以使用explode
函数;
df.withColumn("test", explode($"test")).groupBy($"start", $"end").pivot("test").agg(first("value")).show
了解更多信息:
我希望它有帮助!
更新一
根据您的评论和更新的问题,这是您需要遵循的解决方案。我有意将所有操作分开,因此您可以轻松了解进一步改进需要做什么;
df.withColumn("value", regexp_replace($"value", "\\[", "")). //1
withColumn("value", regexp_replace($"value", "\\]", "")). //2
withColumn("value", split($"value", "\\,")). //3
withColumn("test", explode($"test")). //4
withColumn("value", explode($"value")). //5
withColumn("value", regexp_replace($"value", " +", "")). //6
filter($"value" !== ""). //7
groupBy($"start", $"end").pivot("test"). //8
agg(first("value")).show //9
当您阅读此类 json 文件时,它会为您提供一个数据框,其中
value
包含StringType
. 您不能直接转换StringType
为ArrayType
,因此您需要执行第 1、2、3 行中的一些技巧将其转换为ArrayType
. 您可以在一行中或仅使用一个正则表达式或定义 udf 来执行这些操作。这完全取决于你,我只是想向你展示 Apache Spark 的能力。现在你
value
有了ArrayType
. 就像我们在第 4 行中对列所做的那样,在第 5 行中分解此test
列。然后应用您的旋转操作。
推荐阅读
- git - 如何撤消“git add --intent-to-add”
- swift - SwiftUI 选择器为所选项目和选择视图分隔文本
- python - AttributeError:“SpooledTemporaryFile”对象没有属性“resize”
- php - 在 Laravel 中执行之前的 Mysql 查询拦截
- node.js - 如何在消息中发送某个(例如第二次或第三次)提及
- amazon-web-services - 无法使用密钥对进行 SSH
- unity3d - Photon kick 从游戏中手动添加玩家
- node.js - MongoDB查询以获取所有文档的数组字段长度的总和
- c# - Couchbase:为什么从 Couchbase SDK 检索时任何时区都会自动转换为本地时区
- python - 在张量流中找到两个边界框的交集?