首页 > 解决方案 > 如何使用 spark scala 根据日期将行拆分为多行?

问题描述

我有一个包含如下行的数据框,我需要根据 pa_start_date 和 pa_end_date 拆分这些数据以获取月份系列,并创建一个新的列期间开始和结束日期。

i/p 数据帧 df 是

    p_id pa_id  p_st_date   p_end_date     pa_start_date   pa_end_date  
    p1   pa1    2-Jan-18      5-Dec-18     2-Mar-18        8-Aug-18       
    p1   pa2    3-Jan-18      8-Dec-18     6-Mar-18        10-Nov-18   
    p1   pa3    1-Jan-17      1-Dec-17     9-Feb-17        20-Apr-17  

o/p 是

p_id pa_id  p_st_date   p_end_date pa_start_date pa_end_date period_start_date period_end_date
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     2-Mar-18 31-Mar-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Apr-18 30-Apr-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-May-18 31-May-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Jun-18 30-Jun-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Jul-18 31-Jul-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Aug-18 31-Aug-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    6-Mar-18 31-Mar-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Apr-18 30-Apr-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-May-18 31-May-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Jun-18 30-Jun-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Jul-18 31-Jul-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Aug-18 31-Aug-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Sep-18 30-Sep-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Oct-18 30-Oct-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Nov-18 30-Nov-18
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    9-Feb-17 28-Feb-17
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    1-Mar-17 31-Mar-17
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    1-Apr-17 30-Apr-17

标签: scaladateapache-sparkdataframe

解决方案


我已经完成了创建如下所示的 UDF。

此 UDF 将创建一个日期数组(所有月份的日期,包括开始日期和结束日期),如果和pa_start_date之间的月数作为参数传递。pa_start_datepa_end_date

def udfFunc: ((Date, Long) => Array[String]) = {
            (d, l) =>
                {
                    var t = LocalDate.fromDateFields(d)
                    val dates: Array[String] = new Array[String](l.toInt)
                    for (i <- 0 until l.toInt) {
                        println(t)
                        dates(i) = t.toString("YYYY-MM-dd")
                        t = LocalDate.fromDateFields(t.toDate()).plusMonths(1)
                    }
                    dates
                }
        }
        val my_udf = udf(udfFunc)

最终的数据框创建如下。

val df = ss.read.format("csv").option("header", true).load(path)
            .select($"p_id", $"pa_id", $"p_st_date", $"p_end_date", $"pa_start_date", $"pa_end_date",
                my_udf(to_date(col("pa_start_date"), "dd-MMM-yy"), ceil(months_between(to_date(col("pa_end_date"), "dd-MMM-yy"), to_date(col("pa_start_date"), "dd-MMM-yy")))).alias("udf")) // gives array of dates from UDF
            .withColumn("after_divide", explode($"udf")) // divide array of dates to individual rows
            .withColumn("period_end_date", date_format(last_day($"after_divide"), "dd-MMM-yy")) // fetching the end_date for the particular date
            .drop("udf")
            .withColumn("row_number", row_number() over (Window.partitionBy("p_id", "pa_id", "p_st_date", "p_end_date", "pa_start_date", "pa_end_date").orderBy(col("after_divide").asc))) // just helper column for calculating `period_start_date` below
            .withColumn("period_start_date", date_format(when(col("row_number").isin(1), $"after_divide").otherwise(trunc($"after_divide", "month")), "dd-MMM-yy"))
            .drop("after_divide")
            .drop("row_number") // dropping all the helper columns which is not needed in output.

这是输出。

+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|p_id|pa_id|p_st_date|p_end_date|pa_start_date|pa_end_date|period_end_date|period_start_date|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      28-Feb-17|        09-Feb-17|
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      31-Mar-17|        01-Mar-17|
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      30-Apr-17|        01-Apr-17|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Mar-18|        06-Mar-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Apr-18|        01-Apr-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-May-18|        01-May-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Jun-18|        01-Jun-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Jul-18|        01-Jul-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Aug-18|        01-Aug-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Sep-18|        01-Sep-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Oct-18|        01-Oct-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Nov-18|        01-Nov-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Mar-18|        02-Mar-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      30-Apr-18|        01-Apr-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-May-18|        01-May-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      30-Jun-18|        01-Jun-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Jul-18|        01-Jul-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Aug-18|        01-Aug-18|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+

推荐阅读