首页 > 解决方案 > Spark - 按用户 ID 减少输入文件

问题描述

我正在使用包含 userId、seqId、eventType 和国家/地区的结构化输入文件。在按 seqId 排序后,我需要通过 userId 取每个字段的最后一个非空值来减少它。对于给定的输入:

userId    seqId eventType country
A1600001    2   Update  JP
A1600001    3   Update  
B2301001    2   Update  CH
A1600001    1   Create  CH
C1200011    2   Update  
C1200011    1   Create  IN

减少的结果应该是:

A1600001    3   Update  JP
C1200011    2   Update  IN
B2301001    2   Update  CH

我从以下内容开始:

scala> val file = sc.textFile("/tmp/sample-events.tsv")
scala> val lines = file.map( x => (x.split("\t")(0), x) )
scala> lines.foreach(x => println(x))
(A1600001,A1600001  2   Update  JP)
(A1600001,A1600001  3   Update  )
(B2301001,B2301001  2   Update  CH)
(A1600001,A1600001  1   Create  CH)
(C1200011,C1200011  2   Update  )
(C1200011,C1200011  1   Create  IN)

现在我想要reduceByKey线条(我猜?),但我对这个主题很陌生,我不知道如何构造归约函数。有人可以帮忙吗?

标签: scalaapache-spark

解决方案


使用 spark-sql 和窗口函数。

scala> val df = Seq(("A1600001",2,"Update","JP"),("A1600001",3,"Update",""),("B2301001",2,"Update","CH"),("A1600001",1,"Create","CH"),("C1200011",2,"Update",""),("C1200011",1,"Create","IN")).toDF("userId","seqId","eventType","country")
df: org.apache.spark.sql.DataFrame = [userId: string, seqId: int ... 2 more fields]

scala> df.createOrReplaceTempView("samsu")

scala> spark.sql(""" with tb1(select userId, seqId, eventType, country, lag(country) over(partition by userid order by seqid) lg1, row_number() over(partition by userid order by seqid) rw1,co
unt(*) over(partition by userid) cw1 from samsu) select userId, seqId, eventType,case when country="" then lg1 else country end country from tb1 where rw1=cw1 """).show(false)
+--------+-----+---------+-------+                                              
|userId  |seqId|eventType|country|
+--------+-----+---------+-------+
|A1600001|3    |Update   |JP     |
|C1200011|2    |Update   |IN     |
|B2301001|2    |Update   |CH     |
+--------+-----+---------+-------+


scala>

推荐阅读