scala - Spark - 按用户 ID 减少输入文件
问题描述
我正在使用包含 userId、seqId、eventType 和国家/地区的结构化输入文件。在按 seqId 排序后,我需要通过 userId 取每个字段的最后一个非空值来减少它。对于给定的输入:
userId seqId eventType country
A1600001 2 Update JP
A1600001 3 Update
B2301001 2 Update CH
A1600001 1 Create CH
C1200011 2 Update
C1200011 1 Create IN
减少的结果应该是:
A1600001 3 Update JP
C1200011 2 Update IN
B2301001 2 Update CH
我从以下内容开始:
scala> val file = sc.textFile("/tmp/sample-events.tsv")
scala> val lines = file.map( x => (x.split("\t")(0), x) )
scala> lines.foreach(x => println(x))
(A1600001,A1600001 2 Update JP)
(A1600001,A1600001 3 Update )
(B2301001,B2301001 2 Update CH)
(A1600001,A1600001 1 Create CH)
(C1200011,C1200011 2 Update )
(C1200011,C1200011 1 Create IN)
现在我想要reduceByKey
线条(我猜?),但我对这个主题很陌生,我不知道如何构造归约函数。有人可以帮忙吗?
解决方案
使用 spark-sql 和窗口函数。
scala> val df = Seq(("A1600001",2,"Update","JP"),("A1600001",3,"Update",""),("B2301001",2,"Update","CH"),("A1600001",1,"Create","CH"),("C1200011",2,"Update",""),("C1200011",1,"Create","IN")).toDF("userId","seqId","eventType","country")
df: org.apache.spark.sql.DataFrame = [userId: string, seqId: int ... 2 more fields]
scala> df.createOrReplaceTempView("samsu")
scala> spark.sql(""" with tb1(select userId, seqId, eventType, country, lag(country) over(partition by userid order by seqid) lg1, row_number() over(partition by userid order by seqid) rw1,co
unt(*) over(partition by userid) cw1 from samsu) select userId, seqId, eventType,case when country="" then lg1 else country end country from tb1 where rw1=cw1 """).show(false)
+--------+-----+---------+-------+
|userId |seqId|eventType|country|
+--------+-----+---------+-------+
|A1600001|3 |Update |JP |
|C1200011|2 |Update |IN |
|B2301001|2 |Update |CH |
+--------+-----+---------+-------+
scala>
推荐阅读
- javascript - 如何在javascript中构建带有详细信息标签的树结构
- sql - 奇怪的 Oracle Apex 错误,不存在的错字
- css - 如何让 CSS 轮廓走投无路?
- node.js - 消费者有没有办法向使用 kafka(kafka.js) 和 node.js 实现的生产者发送确认?
- javascript - 如何从角度范围之外重新加载我的 angluarjs 自定义指令?
- micronaut - 如何在不从 Micronaut 重新启动应用程序的情况下查看我的更改?
- javascript - Primevue Textarea 没有样式
- vba - VBA嵌套循环 - 外循环不跳转到下一个值
- testing - 在 Gitlab CI/CD 中使用不同的依赖版本进行测试
- laravel - 如何使用 LARAVEL SQL 将 String 转换为 INT?