scala - 每个分组从 Spark 数据帧中选择特定行
问题描述
我有一个这样的数据框:
+-----------------+------------------+-----------+--------+---+
| conversation_id | message_body | timestamp | sender | |
+-----------------+------------------+-----------+--------+---+
| A | hi | 9:00 | John | |
| A | how are you? | 10:00 | John | |
| A | can we meet? | 10:05 | John | * |
| A | not bad | 10:30 | Steven | * |
| A | great | 10:40 | John | |
| A | yeah, let's meet | 10:35 | Steven | |
| B | Hi | 12:00 | Anna | * |
| B | Hello | 12:05 | Ken | * |
+-----------------+------------------+-----------+--------+---+
对于每个对话,我希望在第一个发件人的第一个块中拥有最后一条消息和第二个发件人的第一条消息。我用星号标记了它们。
我的一个想法是为第一个用户分配 0,为第二个用户分配 1。
理想情况下,我想要这样的东西:
+-----------------+---------+------------+--------------+---------+------------+----------+
| conversation_id | sender1 | timestamp1 | message1 | sender2 | timestamp2 | message2 |
+-----------------+---------+------------+--------------+---------+------------+----------+
| A | John | 10:05 | can we meet? | Steven | 10:30 | not bad |
| B | Anna | 12:00 | Hi | Ken | 12:05 | Hello |
+-----------------+---------+------------+--------------+---------+------------+----------+
我怎么能在 Spark 中做到这一点?
解决方案
有趣的问题出现了。
- 修改时间为 10:35 至 10:45
- 使用前导 0 格式,例如 09:00 而不是 9:00
- 您将需要相应地使用自己的数据类型,这只是演示了所需的方法
在 DataBricks 笔记本中完成
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import spark.implicits._ val df = Seq(("A", "hi", "09:00", "John"), ("A", "how are you?", "10:00", "John"), ("A", "can we meet?", "10:05", "John"), ("A", "not bad", "10:30", "Steven"), ("A", "great", "10:40", "John"), ("A", "yeah, let's meet", "10:45", "Steven"), ("B", "Hi", "12:00", "Anna"), ("B", "Hello", "12:05", "Ken") ).toDF("conversation_id", "message_body", "timestampX", "sender") // Get rank, 1 is who were initiates conversation, the other values can be used to infer relationships // Note no @Transient required here with Window val df2 = df.withColumn("rankC", row_number().over(Window.partitionBy($"conversation_id").orderBy($"timestampX".asc))) // A value <> 1 is the first message of second Sender. // The 1 value of this is the last message of first "block" val df3 = df2.select('conversation_id as "df3_conversation_id", 'sender as "df3_sender", 'rankC as "df3_rank") // To avoid pipelined renaming issues that occur val df3a = df3.groupBy("df3_conversation_id", "df3_sender").agg(min("df3_rank") as "rankC2").filter("rankC2 != 1") //JOIN the values with some smarts. Some odd errors in Spark thru pipe-lining gotten. Need to drop pipelined row(), ranking etc. val df4 = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC") + 1)).drop("rankC").drop("rankC2") val df4a = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC"))).drop("rankC").drop("rankC2") // The get other missing data, could have all been combined but done in steps for simplicity. Just a simple final JOIN and you ahve the answer. val df5 = df4.join(df4a, (df4("df3_conversation_id") === df4a("df3_conversation_id"))) df5.show(false)
返回:
输出不会在这里完全格式化,在 REPL 中运行它来查看标题。
|B |Ken |B |Hi |12:00 |Anna |B |Ken |B |Hello |12:05 |Ken |
|A |Steven |A |can we meet?|10:05 |John |A |Steven |A |not bad |10:30 |Steven|
您可以进一步操作数据,繁重的工作现在已经完成。Catalyst Optimizer 在编译等方面存在一些问题,所以这就是我以这种方式工作的原因。
推荐阅读
- python - NoReverseMatch 在 /。找不到带有参数'('',)'的'主题'的反向
- javascript - 我在 django 项目中发送选择框的数据时遇到问题
- angular - 如何按特定字符串过滤对象
- python - 通过关键python数据框对值求和
- java - 避免在我的课堂上使用扩展和实现
- android - Imageview旋转动画不会在点击时重复
- java - 场景生成器上的雷达图
- php - 如果在 WooCommerce 中为空,则在订单编辑页面(后端)上隐藏自定义字段标签
- laravel - Laravel:如何在同一 div 中显示多个上传图像或视频,如 facebook
- javascript - React 中带有 jsx 扩展的 PascalCase 或没有它的 camelCase?