apache-spark - 用 spark Scala 替换campaign_id 列的缺失值
问题描述
我有一个具有以下结构的 json 文件:
root
|-- header: struct (nullable = true)
| |-- version: integer (nullable = true)
| |-- makerId: string (nullable = true)
| |-- envId: integer (nullable = true)
| |-- id: string (nullable = true)
| |-- creationTime: string (nullable = true)
|-- body: struct (nullable = true)
| |-- playerid: string (nullable = true)
| |-- hostname: string (nullable = true)
| |-- playertype: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- campaignid: string (nullable = true)
| |-- campaignname: string (nullable = true)
| |-- duration: integer (nullable = true)
| |-- creativeid: string (nullable = true)
| |-- frameid: string (nullable = true)
| |-- status: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- ok: string (nullable = true)
| | | |-- ko: string (nullable = true)
| |-- media: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- filename: string (nullable = true)
| | | |-- hash: string (nullable = true)
| | | |-- timestamp: string (nullable = true)
| |-- condition: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- name: string (nullable = true)
| | | |-- value: string (nullable = true)
| | | |-- origin: string (nullable = true)
| |-- context: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- key: string (nullable = true)
| | | |-- value: string (nullable = true)
现在我想用常量值 = 24737468 替换campaignid ojbet 的缺失值,但我无法使用map 函数或na.fill()
请问你能帮帮我吗?
import model.domain.PlayLog
import org.apache.spark.sql.Encoders
object Hait {
def main(args: Array[String])
{
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark Load JSON")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val schema = Encoders.product[PlayLog].schema
val rawpop = spark.read.format("json").schema(schema).load("src/main /ressources/raw_pop.json").as[PlayLog]
val miss_camp = rawpop.filter("body.campaignid is null")
val final_df = miss_camp.select("body.campaignid").map(r => "24737468" )
}
}
解决方案
我试图用简化的 json 重现这个问题:
{"header": {"version": 1}, "body": {"playerid": 101, "campaignid": null}}
{"header": {"version": 2}, "body": {"playerid": 102, "campaignid": "a"}}
{"header": {"version": 3}, "body": {"playerid": 103, "campaignid": "b"}}
我还创建了案例类以将其转换为数据集
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions._
case class Header(version: Int)
case class Body(playerid: Int, campaignid: String)
case class TestJson(header: Header, body: Body)
val schema = Encoders.product[TestJson].schema
val rawpop = spark.read.format("json").schema(schema).load("/tmp/tests.json").as[TestJson]
rawpop
看起来像这样
+------+-----------+
|header| body|
+------+-----------+
| [1]| [101,]|
| [2]| [102, a]|
| [3]| [103, b]|
+------+-----------+
现在让替换null
值
val result = rawpop.withColumn("body", struct($"body.playerid",
when($"body.campaignid".isNull, "default_value").otherwise($"body.campaignid")))
result.show()
现在它显示
+------+--------------------+
|header| body|
+------+--------------------+
| [1]|[101, default_value]|
| [2]| [102, a]|
| [3]| [103, b]|
+------+--------------------+
推荐阅读
- javascript - 如何在Java中保持PNG文件结构
- perl - 在 perl 中尝试::Tiny 和 $SIG{__DIE__}?
- c# - 将默认会话提供程序设置为环境变量
- javascript - 在 __webpack_require__ 中的 NextJS 中构建错误
- r - 如何在R编程中找到数据框列的子组内的平均值?
- python - 从烧瓶路由运行服务器
- javascript - 音频数据返回空数组
- javascript - 在对状态中的元素进行排序后,React 列表不会重新渲染表
- ios - XCode 在扩展“Bundle React Native 代码和图像”阶段时崩溃
- sql - 简单的更新查询导致“子查询返回多于一条记录错误”