首页 > 解决方案 > scala spark - groupBy to find mean between months in a date range

问题描述

I am looking at this drone rental dataset. I would like to try grouping by the Result column in Spark to show the mean result ($) each drone made as a function of the days it spent in that month.

ie. the value in the Result column divided by total days then attributed to the days in each month between the start and end dates

+------+------------------+------------------+--------+
| Drone|     Start        |      End         | Result |
+------+------------------+------------------+--------+
| DR1    16/06/2013 10:30   22/08/2013 07:00    2786  |
| DR1    20/04/2013 23:30   16/06/2013 10:30    7126  |
| DR1    24/01/2013 23:00   20/04/2013 23:30    2964  |
| DR2    01/03/2014 19:00   07/05/2014 18:00    8884  |
| DR2    04/09/2015 09:00   04/11/2015 07:00    7828  |
| DR2    04/10/2013 05:00   24/12/2013 07:00    5700  |
+-----------------------------------------------------+

This is difficult as it is a longer-term rental business and not values associated with one date and so a simple groupBy isn't working for me.

kindly please note the drone is hired on a per minute basis in the full dataset is a little more messy.

I would appreciate some help on the correct thought process for approaching a problem like this and what the code would look like.

How would you change what I have written below to consider each month as a separate case? (I only can base it on the start date) :/

val df_avgs = df.groupBy("Start").mean()
df_avgs.select($"Date",$"avg(Result)").show()

taking the first example from each drone type, my expected output would be:

+------+-------+-------+---------+
|Drone | Month | Days  |   Avg   |
+------+-------+-------+---------+
|DR1     June      X       $YY   |
|DR1     July      X       $YY   |
|DR1     August    X       $YY   |
|DR2     March     Y       $ZZ   |
|DR2     April     Y       $ZZ   |
|DR2     May       Y       $ZZ   |
+--------------------------------+

Thanks so much

标签: scalaapache-spark

解决方案


你能看看这个吗?我在日期格式中使用了“MMM-yy”,这样如果开始日期和结束日期跨年,那么它很容易区分。如果您只需要一个月,您可以将其更改为“MMM”。

scala> val df_t = Seq(("DR1","16/06/2013 10:30","22/08/2013 07:00",2786),("DR1","20/04/2013 23:30","16/06/2013 10:30",7126),("DR1","24/01/2013 23:00","20/04/2013 23:30",2964),("DR2","01/03/2014 19:00","07/05/2014 18:00",8884),("DR2","04/09/2015 09:00","04/11/2015 07:00",7828),("DR2","04/10/2013 05:00","24/12/2013 07:00",5700)).toDF("drone","start","end","result")
df_t: org.apache.spark.sql.DataFrame = [drone: string, start: string ... 2 more fields]

scala> val df = df_t.withColumn("start",to_timestamp('start,"dd/MM/yyyy HH:mm")).withColumn("end",to_timestamp('end,"dd/MM/yyyy HH:mm"))
df: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 2 more fields]

scala> df.show(false)
+-----+-------------------+-------------------+------+
|drone|start              |end                |result|
+-----+-------------------+-------------------+------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |
+-----+-------------------+-------------------+------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date):Seq[String]=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).toSet.toSeq
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)Seq[String]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Seq[String] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(DateType, DateType)))

scala> val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> df2.show(false)
+-----+-------------------+-------------------+------+----+--------------------------------+
|drone|start              |end                |result|days|diff_months                     |
+-----+-------------------+-------------------+------+----+--------------------------------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |67  |[Jun-13, Jul-13, Aug-13]        |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |57  |[Apr-13, May-13, Jun-13]        |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |86  |[Jan-13, Feb-13, Mar-13, Apr-13]|
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |67  |[Mar-14, Apr-14, May-14]        |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |61  |[Sep-15, Oct-15, Nov-15]        |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |81  |[Oct-13, Nov-13, Dec-13]        |
+-----+-------------------+-------------------+------+----+--------------------------------+


scala> df2.withColumn("month",explode('diff_months)).withColumn("Avg",'result/'days).select("drone","month","days","avg").show(false)
+-----+------+----+------------------+
|drone|month |days|avg               |
+-----+------+----+------------------+
|DR1  |Jun-13|67  |41.582089552238806|
|DR1  |Jul-13|67  |41.582089552238806|
|DR1  |Aug-13|67  |41.582089552238806|
|DR1  |Apr-13|57  |125.01754385964912|
|DR1  |May-13|57  |125.01754385964912|
|DR1  |Jun-13|57  |125.01754385964912|
|DR1  |Jan-13|86  |34.46511627906977 |
|DR1  |Feb-13|86  |34.46511627906977 |
|DR1  |Mar-13|86  |34.46511627906977 |
|DR1  |Apr-13|86  |34.46511627906977 |
|DR2  |Mar-14|67  |132.59701492537314|
|DR2  |Apr-14|67  |132.59701492537314|
|DR2  |May-14|67  |132.59701492537314|
|DR2  |Sep-15|61  |128.327868852459  |
|DR2  |Oct-15|61  |128.327868852459  |
|DR2  |Nov-15|61  |128.327868852459  |
|DR2  |Oct-13|81  |70.37037037037037 |
|DR2  |Nov-13|81  |70.37037037037037 |
|DR2  |Dec-13|81  |70.37037037037037 |
+-----+------+----+------------------+


scala>

编辑1

根据每个月的天数进行拆分。代码必须从 UDF 更改。

scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date)=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).groupBy(identity).map( x => (x._1,x._2.length) )
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)scala.collection.immutable.Map[String,Int]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Map[String,Int] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,IntegerType,false),Some(List(DateType, DateType)))

scala>  val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> val df3=df2.select(col("*"),explode('diff_months).as(Seq("month","month_days")) ).withColumn("mnth_rent",'result*('month_days/'days)).select("drone","month","month_days","days","mnth_rent")
df3: org.apache.spark.sql.DataFrame = [drone: string, month: string ... 3 more fields]

scala> df3.show(false)
+-----+------+----------+----+------------------+
|drone|month |month_days|days|mnth_rent         |
+-----+------+----------+----+------------------+
|DR1  |Aug-13|21        |67  |873.223880597015  |
|DR1  |Jul-13|31        |67  |1289.044776119403 |
|DR1  |Jun-13|15        |67  |623.7313432835821 |
|DR1  |May-13|31        |57  |3875.543859649123 |
|DR1  |Apr-13|11        |57  |1375.1929824561403|
|DR1  |Jun-13|15        |57  |1875.2631578947367|
|DR1  |Apr-13|19        |86  |654.8372093023256 |
|DR1  |Feb-13|28        |86  |965.0232558139536 |
|DR1  |Mar-13|31        |86  |1068.4186046511627|
|DR1  |Jan-13|8         |86  |275.72093023255815|
|DR2  |Apr-14|30        |67  |3977.910447761194 |
|DR2  |Mar-14|31        |67  |4110.507462686567 |
|DR2  |May-14|6         |67  |795.5820895522388 |
|DR2  |Nov-15|3         |61  |384.983606557377  |
|DR2  |Oct-15|31        |61  |3978.1639344262294|
|DR2  |Sep-15|27        |61  |3464.8524590163934|
|DR2  |Nov-13|30        |81  |2111.111111111111 |
|DR2  |Oct-13|28        |81  |1970.3703703703702|
|DR2  |Dec-13|23        |81  |1618.5185185185185|
+-----+------+----------+----+------------------+


scala> df3.groupBy('drone,'month).agg(sum('month_days).as("s_month_days"),sum('mnth_rent).as("mnth_rent"),max('days).as("days")).orderBy('drone,'month).show(false)
+-----+------+------------+------------------+----+
|drone|month |s_month_days|mnth_rent         |days|
+-----+------+------------+------------------+----+
|DR1  |Apr-13|30          |2030.030191758466 |86  |
|DR1  |Aug-13|21          |873.223880597015  |67  |
|DR1  |Feb-13|28          |965.0232558139536 |86  |
|DR1  |Jan-13|8           |275.72093023255815|86  |
|DR1  |Jul-13|31          |1289.044776119403 |67  |
|DR1  |Jun-13|30          |2498.994501178319 |67  |
|DR1  |Mar-13|31          |1068.4186046511627|86  |
|DR1  |May-13|31          |3875.543859649123 |57  |
|DR2  |Apr-14|30          |3977.910447761194 |67  |
|DR2  |Dec-13|23          |1618.5185185185185|81  |
|DR2  |Mar-14|31          |4110.507462686567 |67  |
|DR2  |May-14|6           |795.5820895522388 |67  |
|DR2  |Nov-13|30          |2111.111111111111 |81  |
|DR2  |Nov-15|3           |384.983606557377  |61  |
|DR2  |Oct-13|28          |1970.3703703703702|81  |
|DR2  |Oct-15|31          |3978.1639344262294|61  |
|DR2  |Sep-15|27          |3464.8524590163934|61  |
+-----+------+------------+------------------+----+


scala>

推荐阅读