首页 > 解决方案 > 用 spark Scala 替换campaign_id 列的缺失值

问题描述

我有一个具有以下结构的 json 文件:

root
 |-- header: struct (nullable = true)
 |    |-- version: integer (nullable = true)
 |    |-- makerId: string (nullable = true)
 |    |-- envId: integer (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- creationTime: string (nullable = true)
 |-- body: struct (nullable = true)
 |    |-- playerid: string (nullable = true)
 |    |-- hostname: string (nullable = true)
 |    |-- playertype: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- campaignid: string (nullable = true)
 |    |-- campaignname: string (nullable = true)
 |    |-- duration: integer (nullable = true)
 |    |-- creativeid: string (nullable = true)
 |    |-- frameid: string (nullable = true)
 |    |-- status: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- ok: string (nullable = true)
 |    |    |    |-- ko: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- filename: string (nullable = true)
 |    |    |    |-- hash: string (nullable = true)
 |    |    |    |-- timestamp: string (nullable = true)
 |    |-- condition: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- origin: string (nullable = true)
 |    |-- context: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)

现在我想用常量值 = 24737468 替换campaignid ojbet 的缺失值,但我无法使用map 函数或na.fill()

请问你能帮帮我吗?

import model.domain.PlayLog

import org.apache.spark.sql.Encoders


object Hait {

  def main(args: Array[String])
  {
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession
      .builder()
      .appName("Spark Load JSON")
      .config("spark.master", "local")
      .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    val schema = Encoders.product[PlayLog].schema
    val rawpop = spark.read.format("json").schema(schema).load("src/main /ressources/raw_pop.json").as[PlayLog]

    val miss_camp = rawpop.filter("body.campaignid is null")
    val final_df = miss_camp.select("body.campaignid").map(r => "24737468" )

 }
}

标签: apache-spark

解决方案


我试图用简化的 json 重现这个问题:

{"header": {"version": 1}, "body": {"playerid": 101, "campaignid": null}}
{"header": {"version": 2}, "body": {"playerid": 102, "campaignid": "a"}}
{"header": {"version": 3}, "body": {"playerid": 103, "campaignid": "b"}}

我还创建了案例类以将其转换为数据集

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions._

case class Header(version: Int)
case class Body(playerid: Int, campaignid: String)
case class TestJson(header: Header, body: Body)

val schema = Encoders.product[TestJson].schema
val rawpop = spark.read.format("json").schema(schema).load("/tmp/tests.json").as[TestJson]

rawpop看起来像这样

+------+-----------+
|header|       body|
+------+-----------+
|   [1]|     [101,]|
|   [2]|   [102, a]|
|   [3]|   [103, b]|
+------+-----------+

现在让替换null

val result = rawpop.withColumn("body", struct($"body.playerid",
  when($"body.campaignid".isNull, "default_value").otherwise($"body.campaignid")))
result.show()

现在它显示

+------+--------------------+
|header|                body|
+------+--------------------+
|   [1]|[101, default_value]|
|   [2]|            [102, a]|
|   [3]|            [103, b]|
+------+--------------------+

推荐阅读