scala - scala spark - groupBy to find mean between months in a date range
问题描述
I am looking at this drone rental dataset. I would like to try grouping by the Result column in Spark to show the mean result ($) each drone made as a function of the days it spent in that month.
ie. the value in the Result column divided by total days then attributed to the days in each month between the start and end dates
+------+------------------+------------------+--------+
| Drone| Start | End | Result |
+------+------------------+------------------+--------+
| DR1 16/06/2013 10:30 22/08/2013 07:00 2786 |
| DR1 20/04/2013 23:30 16/06/2013 10:30 7126 |
| DR1 24/01/2013 23:00 20/04/2013 23:30 2964 |
| DR2 01/03/2014 19:00 07/05/2014 18:00 8884 |
| DR2 04/09/2015 09:00 04/11/2015 07:00 7828 |
| DR2 04/10/2013 05:00 24/12/2013 07:00 5700 |
+-----------------------------------------------------+
This is difficult as it is a longer-term rental business and not values associated with one date and so a simple groupBy isn't working for me.
kindly please note the drone is hired on a per minute basis in the full dataset is a little more messy.
I would appreciate some help on the correct thought process for approaching a problem like this and what the code would look like.
How would you change what I have written below to consider each month as a separate case? (I only can base it on the start date) :/
val df_avgs = df.groupBy("Start").mean()
df_avgs.select($"Date",$"avg(Result)").show()
taking the first example from each drone type, my expected output would be:
+------+-------+-------+---------+
|Drone | Month | Days | Avg |
+------+-------+-------+---------+
|DR1 June X $YY |
|DR1 July X $YY |
|DR1 August X $YY |
|DR2 March Y $ZZ |
|DR2 April Y $ZZ |
|DR2 May Y $ZZ |
+--------------------------------+
Thanks so much
解决方案
你能看看这个吗?我在日期格式中使用了“MMM-yy”,这样如果开始日期和结束日期跨年,那么它很容易区分。如果您只需要一个月,您可以将其更改为“MMM”。
scala> val df_t = Seq(("DR1","16/06/2013 10:30","22/08/2013 07:00",2786),("DR1","20/04/2013 23:30","16/06/2013 10:30",7126),("DR1","24/01/2013 23:00","20/04/2013 23:30",2964),("DR2","01/03/2014 19:00","07/05/2014 18:00",8884),("DR2","04/09/2015 09:00","04/11/2015 07:00",7828),("DR2","04/10/2013 05:00","24/12/2013 07:00",5700)).toDF("drone","start","end","result")
df_t: org.apache.spark.sql.DataFrame = [drone: string, start: string ... 2 more fields]
scala> val df = df_t.withColumn("start",to_timestamp('start,"dd/MM/yyyy HH:mm")).withColumn("end",to_timestamp('end,"dd/MM/yyyy HH:mm"))
df: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 2 more fields]
scala> df.show(false)
+-----+-------------------+-------------------+------+
|drone|start |end |result|
+-----+-------------------+-------------------+------+
|DR1 |2013-06-16 10:30:00|2013-08-22 07:00:00|2786 |
|DR1 |2013-04-20 23:30:00|2013-06-16 10:30:00|7126 |
|DR1 |2013-01-24 23:00:00|2013-04-20 23:30:00|2964 |
|DR2 |2014-03-01 19:00:00|2014-05-07 18:00:00|8884 |
|DR2 |2015-09-04 09:00:00|2015-11-04 07:00:00|7828 |
|DR2 |2013-10-04 05:00:00|2013-12-24 07:00:00|5700 |
+-----+-------------------+-------------------+------+
scala> :paste
// Entering paste mode (ctrl-D to finish)
def months_range(a:java.sql.Date,b:java.sql.Date):Seq[String]=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).toSet.toSeq
}
// Exiting paste mode, now interpreting.
months_range: (a: java.sql.Date, b: java.sql.Date)Seq[String]
scala> val udf_months_range = udf( months_range(_:java.sql.Date,_:java.sql.Date):Seq[String] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(DateType, DateType)))
scala> val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]
scala> df2.show(false)
+-----+-------------------+-------------------+------+----+--------------------------------+
|drone|start |end |result|days|diff_months |
+-----+-------------------+-------------------+------+----+--------------------------------+
|DR1 |2013-06-16 10:30:00|2013-08-22 07:00:00|2786 |67 |[Jun-13, Jul-13, Aug-13] |
|DR1 |2013-04-20 23:30:00|2013-06-16 10:30:00|7126 |57 |[Apr-13, May-13, Jun-13] |
|DR1 |2013-01-24 23:00:00|2013-04-20 23:30:00|2964 |86 |[Jan-13, Feb-13, Mar-13, Apr-13]|
|DR2 |2014-03-01 19:00:00|2014-05-07 18:00:00|8884 |67 |[Mar-14, Apr-14, May-14] |
|DR2 |2015-09-04 09:00:00|2015-11-04 07:00:00|7828 |61 |[Sep-15, Oct-15, Nov-15] |
|DR2 |2013-10-04 05:00:00|2013-12-24 07:00:00|5700 |81 |[Oct-13, Nov-13, Dec-13] |
+-----+-------------------+-------------------+------+----+--------------------------------+
scala> df2.withColumn("month",explode('diff_months)).withColumn("Avg",'result/'days).select("drone","month","days","avg").show(false)
+-----+------+----+------------------+
|drone|month |days|avg |
+-----+------+----+------------------+
|DR1 |Jun-13|67 |41.582089552238806|
|DR1 |Jul-13|67 |41.582089552238806|
|DR1 |Aug-13|67 |41.582089552238806|
|DR1 |Apr-13|57 |125.01754385964912|
|DR1 |May-13|57 |125.01754385964912|
|DR1 |Jun-13|57 |125.01754385964912|
|DR1 |Jan-13|86 |34.46511627906977 |
|DR1 |Feb-13|86 |34.46511627906977 |
|DR1 |Mar-13|86 |34.46511627906977 |
|DR1 |Apr-13|86 |34.46511627906977 |
|DR2 |Mar-14|67 |132.59701492537314|
|DR2 |Apr-14|67 |132.59701492537314|
|DR2 |May-14|67 |132.59701492537314|
|DR2 |Sep-15|61 |128.327868852459 |
|DR2 |Oct-15|61 |128.327868852459 |
|DR2 |Nov-15|61 |128.327868852459 |
|DR2 |Oct-13|81 |70.37037037037037 |
|DR2 |Nov-13|81 |70.37037037037037 |
|DR2 |Dec-13|81 |70.37037037037037 |
+-----+------+----+------------------+
scala>
编辑1
根据每个月的天数进行拆分。代码必须从 UDF 更改。
scala> :paste
// Entering paste mode (ctrl-D to finish)
def months_range(a:java.sql.Date,b:java.sql.Date)=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).groupBy(identity).map( x => (x._1,x._2.length) )
}
// Exiting paste mode, now interpreting.
months_range: (a: java.sql.Date, b: java.sql.Date)scala.collection.immutable.Map[String,Int]
scala> val udf_months_range = udf( months_range(_:java.sql.Date,_:java.sql.Date):Map[String,Int] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,IntegerType,false),Some(List(DateType, DateType)))
scala> val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]
scala> val df3=df2.select(col("*"),explode('diff_months).as(Seq("month","month_days")) ).withColumn("mnth_rent",'result*('month_days/'days)).select("drone","month","month_days","days","mnth_rent")
df3: org.apache.spark.sql.DataFrame = [drone: string, month: string ... 3 more fields]
scala> df3.show(false)
+-----+------+----------+----+------------------+
|drone|month |month_days|days|mnth_rent |
+-----+------+----------+----+------------------+
|DR1 |Aug-13|21 |67 |873.223880597015 |
|DR1 |Jul-13|31 |67 |1289.044776119403 |
|DR1 |Jun-13|15 |67 |623.7313432835821 |
|DR1 |May-13|31 |57 |3875.543859649123 |
|DR1 |Apr-13|11 |57 |1375.1929824561403|
|DR1 |Jun-13|15 |57 |1875.2631578947367|
|DR1 |Apr-13|19 |86 |654.8372093023256 |
|DR1 |Feb-13|28 |86 |965.0232558139536 |
|DR1 |Mar-13|31 |86 |1068.4186046511627|
|DR1 |Jan-13|8 |86 |275.72093023255815|
|DR2 |Apr-14|30 |67 |3977.910447761194 |
|DR2 |Mar-14|31 |67 |4110.507462686567 |
|DR2 |May-14|6 |67 |795.5820895522388 |
|DR2 |Nov-15|3 |61 |384.983606557377 |
|DR2 |Oct-15|31 |61 |3978.1639344262294|
|DR2 |Sep-15|27 |61 |3464.8524590163934|
|DR2 |Nov-13|30 |81 |2111.111111111111 |
|DR2 |Oct-13|28 |81 |1970.3703703703702|
|DR2 |Dec-13|23 |81 |1618.5185185185185|
+-----+------+----------+----+------------------+
scala> df3.groupBy('drone,'month).agg(sum('month_days).as("s_month_days"),sum('mnth_rent).as("mnth_rent"),max('days).as("days")).orderBy('drone,'month).show(false)
+-----+------+------------+------------------+----+
|drone|month |s_month_days|mnth_rent |days|
+-----+------+------------+------------------+----+
|DR1 |Apr-13|30 |2030.030191758466 |86 |
|DR1 |Aug-13|21 |873.223880597015 |67 |
|DR1 |Feb-13|28 |965.0232558139536 |86 |
|DR1 |Jan-13|8 |275.72093023255815|86 |
|DR1 |Jul-13|31 |1289.044776119403 |67 |
|DR1 |Jun-13|30 |2498.994501178319 |67 |
|DR1 |Mar-13|31 |1068.4186046511627|86 |
|DR1 |May-13|31 |3875.543859649123 |57 |
|DR2 |Apr-14|30 |3977.910447761194 |67 |
|DR2 |Dec-13|23 |1618.5185185185185|81 |
|DR2 |Mar-14|31 |4110.507462686567 |67 |
|DR2 |May-14|6 |795.5820895522388 |67 |
|DR2 |Nov-13|30 |2111.111111111111 |81 |
|DR2 |Nov-15|3 |384.983606557377 |61 |
|DR2 |Oct-13|28 |1970.3703703703702|81 |
|DR2 |Oct-15|31 |3978.1639344262294|61 |
|DR2 |Sep-15|27 |3464.8524590163934|61 |
+-----+------+------------+------------------+----+
scala>
推荐阅读
- mysql - mysql 查询中添加列的 AFTER Column 子句是否会减少或影响性能?
- security - 区分漏洞和漏洞并从 GitHub 问题报告中提取漏洞
- post - 如何获取用于在 Instagram 公共页面上发布内容的“appp/website/source”的名称/链接?
- python - 将字符串转换为字典到数据框
- ios - 可以从对象的图片生成 cad 文件并指定参考距离吗?
- reactjs - 使用 Redux 在外部单击时如何关闭切换
- r - 我将如何通过字符串中的单词过滤数据框中的元素
- python - 如何使用python控制mac音量?
- typescript - 您可以在打字稿中将对象声明为自身吗?
- php - 使用无效访问令牌的响应在缩放 api 创建会议