scala - Sql 领先,滞后,第一个值,最后一个值,在普通 Scala 中的无界进程和窗口子句之间的行(没有火花)
问题描述
我试图将 SQL 转换为 scala 只是为了我自己的知识我能够解决几个问题 sum、count、min、max、group by、join 和 order by。
我想根据下面的示例数据了解其他几个问题的答案,即领先、滞后、第一个值、最后一个值和窗口子句。
package SqlConversion_to_Java_Scala
import scala.collection.JavaConversions._
import java.util._
object Example6SqlToScalaWindow {
case class Employee( var empid: Int,
var name: String,
var age: Int,
var dept: String,var salary:Int)
def main(args: Array[String]): Unit = {
/*
select
dept,
sum(salary) over (partition by age order by age des) as sum_over_age,
max(salary) over (partition by age order by age des) as max_over_age
from table
*/
// create the employee list
val empData: List[Employee] = new ArrayList[Employee]()
// of the list interface
empData.add(new Employee(1, "Ajay", 25, "Technical", 35000))
empData.add(new Employee(3, "Chandan", 22, "Technical", 30000))
empData.add(new Employee(4, "Arjun", 30, "Management", 54000))
empData.add(new Employee(2, "Arun", 28, "Sales", 9000))
empData.add(new Employee(8, "Anmol", 28, "Sales", 15000))
empData.add(new Employee(9, "Vivek", 20, "Management", 8000))
empData.add(new Employee(10, "Nikhil", 20, "Sales", 7000))
empData.add(new Employee(5, "Rahul", 30, "Management", 60000))
empData.add(new Employee(6, "Ganesh", 32, "Sales", 35000))
empData.add(new Employee(7, "Vishal", 32, "Technical", 40000))
empData.add(new Employee(11, "Anmol", 25, "Sales", 15000))
empData.add(new Employee(12, "Vivek", 25, "Management", 8000))
empData.add(new Employee(13, "Nikhil", 30, "Technical", 7000))
val empSum = empData
.map(rows => (rows.dept,rows.salary))
.groupBy(data => (data._1))
.mapValues(_.map(_._2).sum).toMap
val empCount = empData
.map(rows => (rows.dept,rows.salary))
.groupBy(data => (data._1))
.mapValues(_.length).toMap
val empMax = empData
.map(rows => (rows.dept,rows.salary))
.groupBy(data => (data._1))
.mapValues(_.map(_._2).max).toMap
val empMin = empData
.map(rows => (rows.dept,rows.salary))
.groupBy(data => (data._1))
.mapValues(_.map(_._2).min).toMap
val empDeptName = empData
.map(rows => (rows.dept,rows.dept)).toMap
val sumPartitionByDeptAge = empData
.map(rows => (rows.dept,rows.age,rows.salary))
.groupBy(data => (data._1,data._2))
.mapValues(_.map(_._3).sum).toMap
empData.sortBy(data => (data.dept,data.age))
.map(rows => (
rows.empid,
rows.name,
rows.age,
rows.salary,
empDeptName.getOrElse(rows.dept,0),
empMax.getOrElse(rows.dept,0),
empMin.getOrElse(rows.dept,0),
empSum.getOrElse(rows.dept,0),
sumPartitionByDeptAge.getOrElse((rows.dept,rows.age),0)
)
)
.foreach(println)
}
}
谢谢斯里
解决方案
这是代码的清洁版本。
我为 and 添加了方法first
,last
因为它们只是标准head
和last
运算符。
其他 ( lead
, lag
, window
) 可以使用sliding(2)
orsliding(window)
来实现List
,然后每次都处理适当的元素(在列表每一端的“角落”情况下需要小心)。
case class Employee(
empid: Int,
name: String,
age: Int,
dept: String,
salary: Int,
)
// create the employee list
val empData = List(
Employee(1, "Ajay", 25, "Technical", 35000),
Employee(3, "Chandan", 22, "Technical", 30000),
Employee(4, "Arjun", 30, "Management", 54000),
Employee(2, "Arun", 28, "Sales", 9000),
Employee(8, "Anmol", 28, "Sales", 15000),
Employee(9, "Vivek", 20, "Management", 8000),
Employee(10, "Nikhil", 20, "Sales", 7000),
Employee(5, "Rahul", 30, "Management", 60000),
Employee(6, "Ganesh", 32, "Sales", 35000),
Employee(7, "Vishal", 32, "Technical", 40000),
Employee(11, "Anmol", 25, "Sales", 15000),
Employee(12, "Vivek", 25, "Management", 8000),
Employee(13, "Nikhil", 30, "Technical", 7000),
)
val byDept: Map[String, List[Employee]] =
empData.groupBy(_.dept)
val byDeptAge: Map[(String, Int), List[Employee]] =
empData.groupBy(row => (row.dept, row.age))
val empSum: Map[String, Int] =
byDept.map { case (dept, rows) => dept -> rows.map(_.salary).sum }
val empCount: Map[String, Int] =
byDept.map { case (dept, rows) => dept -> rows.length }
val empMax: Map[String, Int] =
byDept.map { case (dept, rows) => dept -> rows.map(_.salary).max }
val empMin: Map[String, Int] =
byDept.map { case (dept, rows) => dept -> rows.map(_.salary).min }
val empDeptName: Map[String, String] =
empData.map(row => (row.dept, row.dept)).toMap
val sumPartitionByDeptAge: Map[(String, Int), Int] =
byDeptAge.map { case (deptAge, rows) => deptAge -> rows.map(_.salary).sum }
val empDeptFirst: Map[String, Employee] =
byDept.map { case (dept, rows) => dept -> rows.head }
val empDeptLast: Map[String, Employee] =
byDept.map { case (dept, rows) => dept -> rows.last }
empData.sortBy(data => (data.dept, data.age))
.map(rows => (
rows.empid,
rows.name,
rows.age,
rows.salary,
empDeptName.getOrElse(rows.dept, 0),
empMax.getOrElse(rows.dept, 0),
empMin.getOrElse(rows.dept, 0),
empSum.getOrElse(rows.dept, 0),
sumPartitionByDeptAge.getOrElse((rows.dept, rows.age), 0)
)
)
.foreach(println)
推荐阅读
- android - Google In App Purchase 检索已消费的购买
- node.js - 如何取消对未授权资源的订阅
- elasticsearch - 在弹性搜索中使用 Jest 客户端更新部分文档
- python - Numpy:高效的矩阵索引
- angular - 如何在每次路由更改时禁用渲染 MasterComponent
- ruby-on-rails - 将字符串数组转换为 StrongParameters 白名单
- redirect - 将 Laravel 主页重定向到另一个 URL 的最佳实践?
- objective-c - Storekit:测试收据验证 - 何时使用哪个服务器地址?
- excel - 如何将模块中的格式化子例程应用于跨工作簿的各种范围
- java - 有没有其他方法可以将 lamda 变量带出 lamda 循环