首页 > 解决方案 > 用当前行两列之间的列值计算 DataFrame 中的行数。添加为列

问题描述

假设我有一个包含以下列的表格:

| user| event_start         | event_end           | show_start          | show_end            | show_name  |
------------------------------------------------------------------------------------------------------------
| 286 | 2018-06-12 00:00:19 | 2018-06-12 00:00:48 | 2018-06-12 00:00:00 | 2018-06-12 01:00:00 | foo        |
| 287 | 2018-06-12 00:00:45 | 2018-06-12 00:00:53 | 2018-06-12 00:00:00 | 2018-06-12 01:00:00 | foo        |
| 288 | 2018-06-12 00:00:47 | 2018-06-12 00:00:58 | 2018-06-12 00:00:00 | 2018-06-12 02:00:00 | bar        |
...

如何在表中添加一个包含不同用户计数的新列,以使它们的event_start值位于该行的show_start和之间show_end

这样我就剩下一张如下表:

| user| event_start         | event_end           | show_start          | show_end            | show_name  | active_users |
---------------------------------------------------------------------------------------------------------------------------
| 286 | 2018-06-12 00:00:19 | 2018-06-12 00:00:48 | 2018-06-12 00:00:00 | 2018-06-12 01:00:00 | foo        | 18           |
| 287 | 2018-06-12 00:00:45 | 2018-06-12 00:00:53 | 2018-06-12 00:00:00 | 2018-06-12 01:00:00 | foo        | 18           |
| 288 | 2018-06-12 00:00:47 | 2018-06-12 00:00:58 | 2018-06-12 00:00:00 | 2018-06-12 02:00:00 | bar        | 31           |
...

这将用于计算观看每个节目的用户份额与活跃用户的份额。

我有一种直觉,我可能需要窗口函数,但我不能完全开始构建所需的操作。

标签: scalaapache-sparkapache-spark-sql

解决方案


从评论部分的明确要求来看,似乎需要对每个不同节目的活跃用户进行完整的数据框查找。这可能会很昂贵,特别是如果有许多不同的节目。假设不同节目的数量不太大(即小到足以被collect编辑),这是一种方法:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val df = Seq(
  (286, Timestamp.valueOf("2018-06-12 00:00:19"), Timestamp.valueOf("2018-06-12 00:00:48"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 01:00:00"), "foo"),
  (287, Timestamp.valueOf("2018-06-12 00:00:45"), Timestamp.valueOf("2018-06-12 00:00:53"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 01:00:00"), "foo"),
  (288, Timestamp.valueOf("2018-06-12 00:00:47"), Timestamp.valueOf("2018-06-12 00:00:58"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 02:00:00"), "bar"),
  (301, Timestamp.valueOf("2018-06-12 03:00:15"), Timestamp.valueOf("2018-06-12 03:00:45"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 02:00:00"), "bar"),
  (302, Timestamp.valueOf("2018-06-12 00:00:15"), Timestamp.valueOf("2018-06-12 00:00:30"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 02:00:00"), "bar"),
  (302, Timestamp.valueOf("2018-06-12 01:00:20"), Timestamp.valueOf("2018-06-12 01:00:50"),
        Timestamp.valueOf("2018-06-12 00:00:00"), Timestamp.valueOf("2018-06-12 02:00:00"), "bar"),
  (303, Timestamp.valueOf("2018-06-12 01:00:30"), Timestamp.valueOf("2018-06-12 01:00:45"),
        Timestamp.valueOf("2018-06-12 02:00:00"), Timestamp.valueOf("2018-06-12 03:00:00"), "gee")
).toDF("user", "event_start", "event_end", "show_start", "show_end", "show_name")

df.show
// +----+-------------------+-------------------+-------------------+-------------------+---------+
// |user|        event_start|          event_end|         show_start|           show_end|show_name|
// +----+-------------------+-------------------+-------------------+-------------------+---------+
// | 286|2018-06-12 00:00:19|2018-06-12 00:00:48|2018-06-12 00:00:00|2018-06-12 01:00:00|      foo|
// | 287|2018-06-12 00:00:45|2018-06-12 00:00:53|2018-06-12 00:00:00|2018-06-12 01:00:00|      foo|
// | 288|2018-06-12 00:00:47|2018-06-12 00:00:58|2018-06-12 00:00:00|2018-06-12 02:00:00|      bar|
// | 301|2018-06-12 03:00:15|2018-06-12 03:00:45|2018-06-12 00:00:00|2018-06-12 02:00:00|      bar|
// | 302|2018-06-12 00:00:15|2018-06-12 00:00:30|2018-06-12 00:00:00|2018-06-12 02:00:00|      bar|
// | 302|2018-06-12 01:00:20|2018-06-12 01:00:50|2018-06-12 00:00:00|2018-06-12 02:00:00|      bar|
// | 303|2018-06-12 01:00:30|2018-06-12 01:00:45|2018-06-12 02:00:00|2018-06-12 03:00:00|      gee|
// +----+-------------------+-------------------+-------------------+-------------------+---------+

val showList = df.select($"show_name", $"show_start", $"show_end").
  distinct.collect

val showsListNew = showList.map( row => {
    val distinctCount = df.select(countDistinct(when($"event_start".between(
        row.getTimestamp(1), row.getTimestamp(2)
      ), $"user"))
    ).head.getLong(0)

    (row.getString(0), row.getTimestamp(1), row.getTimestamp(2), distinctCount)
  }
)
// showsListNew: Array[(String, java.sql.Timestamp, java.sql.Timestamp, Long)] = Array(
//   (gee, 2018-06-12 02:00:00.0, 2018-06-12 03:00:00.0, 0),
//   (bar, 2018-06-12 00:00:00.0, 2018-06-12 02:00:00.0, 5),
//   (foo, 2018-06-12 00:00:00.0, 2018-06-12 01:00:00.0, 4)
// )

val showDF = sc.parallelize(showsListNew).toDF("show_name", "show_start", "show_end", "active_users")

df.join(showDF, Seq("show_name", "show_start", "show_end")).
  show
// +---------+-------------------+-------------------+----+-------------------+-------------------+------------+
// |show_name|         show_start|           show_end|user|        event_start|          event_end|active_users|
// +---------+-------------------+-------------------+----+-------------------+-------------------+------------+
// |      gee|2018-06-12 02:00:00|2018-06-12 03:00:00| 303|2018-06-12 01:00:30|2018-06-12 01:00:45|           0|
// |      bar|2018-06-12 00:00:00|2018-06-12 02:00:00| 302|2018-06-12 01:00:20|2018-06-12 01:00:50|           5|
// |      bar|2018-06-12 00:00:00|2018-06-12 02:00:00| 302|2018-06-12 00:00:15|2018-06-12 00:00:30|           5|
// |      bar|2018-06-12 00:00:00|2018-06-12 02:00:00| 301|2018-06-12 03:00:15|2018-06-12 03:00:45|           5|
// |      bar|2018-06-12 00:00:00|2018-06-12 02:00:00| 288|2018-06-12 00:00:47|2018-06-12 00:00:58|           5|
// |      foo|2018-06-12 00:00:00|2018-06-12 01:00:00| 287|2018-06-12 00:00:45|2018-06-12 00:00:53|           4|
// |      foo|2018-06-12 00:00:00|2018-06-12 01:00:00| 286|2018-06-12 00:00:19|2018-06-12 00:00:48|           4|
// +---------+-------------------+-------------------+----+-------------------+-------------------+------------+

推荐阅读