首页 > 解决方案 > Sql 领先,滞后,第一个值,最后一个值,在普通 Scala 中的无界进程和窗口子句之间的行(没有火花)

问题描述

我试图将 SQL 转换为 scala 只是为了我自己的知识我能够解决几个问题 sum、count、min、max、group by、join 和 order by。

我想根据下面的示例数据了解其他几个问题的答案,即领先、滞后、第一个值、最后一个值和窗口子句。

package SqlConversion_to_Java_Scala

import scala.collection.JavaConversions._
import java.util._

object Example6SqlToScalaWindow {

  case class Employee(  var empid: Int,
                        var name: String,
                        var age: Int,
                        var dept: String,var salary:Int)

  def main(args: Array[String]): Unit = {

   /*
    select
        dept,
        sum(salary) over (partition by age order by age des) as sum_over_age,
        max(salary) over (partition by age order by age des) as max_over_age
    from table
    */

    // create the employee list
    val empData: List[Employee] = new ArrayList[Employee]()

    // of the list interface
    empData.add(new Employee(1, "Ajay", 25, "Technical", 35000))
    empData.add(new Employee(3, "Chandan", 22, "Technical", 30000))
    empData.add(new Employee(4, "Arjun", 30, "Management", 54000))
    empData.add(new Employee(2, "Arun", 28, "Sales", 9000))
    empData.add(new Employee(8, "Anmol", 28, "Sales", 15000))
    empData.add(new Employee(9, "Vivek", 20, "Management", 8000))
    empData.add(new Employee(10, "Nikhil", 20, "Sales", 7000))
    empData.add(new Employee(5, "Rahul", 30, "Management", 60000))
    empData.add(new Employee(6, "Ganesh", 32, "Sales", 35000))
    empData.add(new Employee(7, "Vishal", 32, "Technical", 40000))
    empData.add(new Employee(11, "Anmol", 25, "Sales", 15000))
    empData.add(new Employee(12, "Vivek", 25, "Management", 8000))
    empData.add(new Employee(13, "Nikhil", 30, "Technical", 7000))

    val empSum = empData
      .map(rows => (rows.dept,rows.salary))
      .groupBy(data => (data._1))
      .mapValues(_.map(_._2).sum).toMap

    val empCount = empData
      .map(rows => (rows.dept,rows.salary))
      .groupBy(data => (data._1))
      .mapValues(_.length).toMap

    val empMax = empData
      .map(rows => (rows.dept,rows.salary))
      .groupBy(data => (data._1))
      .mapValues(_.map(_._2).max).toMap

    val empMin = empData
      .map(rows => (rows.dept,rows.salary))
      .groupBy(data => (data._1))
      .mapValues(_.map(_._2).min).toMap

    val empDeptName = empData
      .map(rows => (rows.dept,rows.dept)).toMap

    val sumPartitionByDeptAge = empData
      .map(rows => (rows.dept,rows.age,rows.salary))
      .groupBy(data => (data._1,data._2))
      .mapValues(_.map(_._3).sum).toMap


    empData.sortBy(data => (data.dept,data.age))
      .map(rows => (
        rows.empid,
        rows.name,
        rows.age,
        rows.salary,
        empDeptName.getOrElse(rows.dept,0),
        empMax.getOrElse(rows.dept,0),
        empMin.getOrElse(rows.dept,0),
        empSum.getOrElse(rows.dept,0),
        sumPartitionByDeptAge.getOrElse((rows.dept,rows.age),0)
      )
      )
      .foreach(println)

  }

}

谢谢斯里

标签: scala

解决方案


这是代码的清洁版本。

我为 and 添加了方法firstlast因为它们只是标准headlast运算符。

其他 ( lead, lag, window) 可以使用sliding(2)orsliding(window)来实现List,然后每次都处理适当的元素(在列表每一端的“角落”情况下需要小心)。

case class Employee(
  empid: Int,
  name: String,
  age: Int,
  dept: String,
  salary: Int,
)

// create the employee list
val empData = List(
  Employee(1, "Ajay", 25, "Technical", 35000),
  Employee(3, "Chandan", 22, "Technical", 30000),
  Employee(4, "Arjun", 30, "Management", 54000),
  Employee(2, "Arun", 28, "Sales", 9000),
  Employee(8, "Anmol", 28, "Sales", 15000),
  Employee(9, "Vivek", 20, "Management", 8000),
  Employee(10, "Nikhil", 20, "Sales", 7000),
  Employee(5, "Rahul", 30, "Management", 60000),
  Employee(6, "Ganesh", 32, "Sales", 35000),
  Employee(7, "Vishal", 32, "Technical", 40000),
  Employee(11, "Anmol", 25, "Sales", 15000),
  Employee(12, "Vivek", 25, "Management", 8000),
  Employee(13, "Nikhil", 30, "Technical", 7000),
)

val byDept: Map[String, List[Employee]] =
  empData.groupBy(_.dept)

val byDeptAge: Map[(String, Int), List[Employee]] =
  empData.groupBy(row => (row.dept, row.age))

val empSum: Map[String, Int] =
  byDept.map { case (dept, rows) => dept -> rows.map(_.salary).sum }

val empCount: Map[String, Int] =
  byDept.map { case (dept, rows) => dept -> rows.length }

val empMax: Map[String, Int] =
  byDept.map { case (dept, rows) => dept -> rows.map(_.salary).max }

val empMin: Map[String, Int] =
  byDept.map { case (dept, rows) => dept -> rows.map(_.salary).min }

val empDeptName: Map[String, String] =
  empData.map(row => (row.dept, row.dept)).toMap

val sumPartitionByDeptAge: Map[(String, Int), Int] =
  byDeptAge.map { case (deptAge, rows) => deptAge -> rows.map(_.salary).sum }

val empDeptFirst: Map[String, Employee] =
  byDept.map { case (dept, rows) => dept -> rows.head }

val empDeptLast: Map[String, Employee] =
  byDept.map { case (dept, rows) => dept -> rows.last }


empData.sortBy(data => (data.dept, data.age))
  .map(rows => (
    rows.empid,
    rows.name,
    rows.age,
    rows.salary,
    empDeptName.getOrElse(rows.dept, 0),
    empMax.getOrElse(rows.dept, 0),
    empMin.getOrElse(rows.dept, 0),
    empSum.getOrElse(rows.dept, 0),
    sumPartitionByDeptAge.getOrElse((rows.dept, rows.age), 0)
  )
  )
  .foreach(println)

推荐阅读