首页 > 解决方案 > spark从变量字段中为withColumn函数分配列名

问题描述

我有一些像下面这样的 json 数据,我需要根据一些 Jason 值创建新列

{ "start": "1234567679", "test": ["abc"], "value": 324, "end": "1234567689" }

{ "start": "1234567679", "test": ["xyz"], "value": "Near", "end": "1234567689"}

{ "start": "1234568679", "test": ["pqr"], "value": ["Attr"," "], "end":"1234568679"}  

{ "start": "1234568997", "test": ["mno"], "value": ["{\"key\": \"1\", \"value\": [\"789\"]}" ], "end": "1234568999"} 

以上是json示例

我想创建一个如下所示的列

 start      abc     xyz    pqr     mno    end
 1234567679 324     null   null    null   1234567689
 1234567889 null    Near   null    null   1234567989
 1234568679 null    null   attr    null   1234568679
 1234568997 null    null   null    789    1234568999
def getValue1(s1: Seq[String], v: String) = {
     if (s1(0)=="abc"))  v else null
} 
 
def getValue2(s1: Seq[String], v: String) = {
    if (s1(0)=="xyz"))  v else null
}  

val df = spark.read.json("path to json")

val tdf = df.withColumn("abc",getValue1($"test", $"value")).withColumn("xyz",getValue2($"test", $"value"))

但是我不想使用这个,因为我的测试值更多,我想要一些函数做这样的事情

def getColumnname(s1: Seq[String]) = {
    return s1(0)
}  

val tdf = df.withColumn(getColumnname($"test"),$"value"))

将值更改为列是否是个好主意,我想要这样,因为我需要将它应用于一些需要普通列的机器学习代码

标签: apache-sparkapache-spark-sql

解决方案


您可以使用枢轴操作来执行此类操作。假设您的数组中始终有一项作为test列,这是更简单的解决方案;

import org.apache.spark.sql.functions._
val df = sqlContext.read.json("<yourPath>")
df.withColumn("test", $"test".getItem(0)).groupBy($"start", $"end").pivot("test").agg(first("value")).show
+----------+----------+----+----+
|     start|       end| abc| xyz|
+----------+----------+----+----+
|1234567679|1234567689| 324|null|
|1234567889|1234567689|null| 789|
+----------+----------+----+----+

如果test列中有多个值,也可以使用explode函数;

df.withColumn("test", explode($"test")).groupBy($"start", $"end").pivot("test").agg(first("value")).show

了解更多信息:

我希望它有帮助!

更新一

根据您的评论和更新的问题,这是您需要遵循的解决方案。我有意将所有操作分开,因此您可以轻松了解进一步改进需要做什么;

df.withColumn("value", regexp_replace($"value", "\\[", "")). //1
   withColumn("value", regexp_replace($"value", "\\]", "")). //2
   withColumn("value", split($"value", "\\,")).              //3
   withColumn("test", explode($"test")).                     //4
   withColumn("value", explode($"value")).                   //5
   withColumn("value", regexp_replace($"value", " +", "")).  //6
   filter($"value" !== "").                                  //7
   groupBy($"start", $"end").pivot("test").                  //8
   agg(first("value")).show                                  //9
  • 当您阅读此类 json 文件时,它会为您提供一个数据框,其中value包含StringType. 您不能直接转换StringTypeArrayType,因此您需要执行第 1、2、3 行中的一些技巧将其转换为ArrayType. 您可以在一行中或仅使用一个正则表达式或定义 udf 来执行这些操作。这完全取决于你,我只是想向你展示 Apache Spark 的能力。

  • 现在你value有了ArrayType. 就像我们在第 4 行中对列所做的那样,在第 5 行中分解此test列。然后应用您的旋转操作。


推荐阅读