scala - Spark-rdd 操作数据
问题描述
我有如下示例数据:
UserId,ProductId,Category,Action
1,111,Electronics,Browse
2,112,Fashion,Click
3,113,Kids,AddtoCart
4,114,Food,Purchase
5,115,Books,Logout
6,114,Food,Click
7,113,Kids,AddtoCart
8,115,Books,Purchase
9,111,Electronics,Click
10,112,Fashion,Purchase
3,112,Fashion,Click
我需要生成对“时尚”类别或“电子”类别感兴趣但对这两个类别都不感兴趣的用户列表。用户是否感兴趣,如果他/她使用我在下面完成的 spark/scala 代码执行了任何这些操作(点击/添加到购物车/购买):
val rrd1 = sc.textFile("/user/harshit.kacker/datametica_logs.csv")
val rrd2 = rrd1.map( x=> {
| val c = x.split(",")
| (c(0).toInt , x)})
val rrd3 = rrd1.filter(x=> x.split(",")(2) == "Fashion" || x.split(",")(2) == "Electronics")
val rrd4 = rrd3.filter(x=> x.split(",")(3)== "Click" || x.split(",")(3)=="Purchase" || x.split(",")(3)=="AddtoCart")
rrd4.collect.foreach(println)
2,112,Fashion,Click
9,111,Electronics,Click
10,112,Fashion,Purchase
3,112,Fashion,Click
4,111,Electronics,Click
19,112,Fashion,Click
9,112,Fashion,Purchase
2,112,Fashion,Click
2,111,Electronics,Click
1,112,Fashion,Purchase
现在我必须努力“生成对“时尚”类别或“电子”类别感兴趣但对这两个类别都不感兴趣的用户列表”这个斜体部分并获得所需的输出:
10,Fashion
3,Fashion
4,Electronics
19,Fashion
1,Fashion
意味着应消除具有时尚和电子产品类别的userId 。我怎样才能达到同样的效果?
解决方案
首先将输入文本文件解析为元组:
val srcPath = "/user/harshit.kacker/datametica_logs.csv"
// parse test file in to tuples:
val rdd = spark.sparkContext.textFile(srcPath)
val rows = rdd.map(line => line.split(",")).map(row => (row(0), row(1), row(2), row(3)))
val header = rows.first
// drop the header:
val logs = rows.filter(row => row != header)
按兴趣标准过滤 RDD:
val interests = logs.filter(log =>
List("Click", "AddtoCart", "Purchase").contains(log._4)
)
分别过滤时尚和电子产品:
val fashion = interests.filter(row => row._3 == "Fashion")
val electronics = interests.filter(row => row._3 == "Electronics")
查找时尚和电子产品之间的共同用户 ID:
val fashionIds = fashion.map(_._1).distinct
val electronicsIds = electronics.map(_._1).distinct
val commonIds = fashionIds.intersection(electronicsIds).collect()
结合时尚和电子行并过滤两者之间的共同ID:
val finalRdd = (fashion ++ electronics)
.filter(log => !commonIds.contains(log._1))
.map(log => (log._1, log._3))
.distinct()
编辑:使用数据框
// using dataframes:
val df = spark.read.option("header", "true").csv(srcPath)
val interestDf = df.where($"Action".isin("Click", "Purchase", "AddToCart"))
val fashionDf = interestDf.where($"Category" === "Fashion")
val electronicsDf = interestDf.where($"Category" === "Electronics")
val joinDf = electronicsDf.alias("e").join(fashionDf.alias("f"), Seq("UserId"), "outer")
.where($"e.Category".isNull || $"f.Category".isNull)
val finalDf = joinDf.select($"UserId", when($"e.Category".isNull, $"f.Category").otherwise($"e.Category").as("Category")).distinct
推荐阅读
- deployment - 将 Vaadin OSGI portlet 所需的包部署到 Liferay 7.3
- linux - 在 shell 脚本的单行中执行 grep 并杀死 pid
- amazon-web-services - 无法安装额外的 EFS 接入点
- amazon-web-services - Elastic Beanstalk 网站无法在 Firefox 上加载
- javascript - Typescript 中奇怪的日志记录行为
- node.js - 如何检测我的代码(而不是我的依赖项)进行的同步调用?
- javascript - 从 DOM 中移除元素
- macos - macOS中名为“提示”的zshell命令?
- c# - 在 NSwag 中是否可以只生成 C# 客户端接口并使用一些自定义属性注释它们的方法?
- java - 明确提到主要功能但出现错误:在类服务器中找不到主要方法