apache-spark - 如何使用 Apache Spark 和 Sparkube 实施“最后一个非空”措施?
问题描述
我们结合使用 Apache Spark 和 Sparkube 来动态创建在线分析环境。数据在 Spark 中准备,并使用 Sparkube 作为多维立方体公开。Sparkube 自动发布简单的聚合(SUM、MIN、MAX、AVG、STD...),但我们如何支持“最后一个非空”类型的聚合呢?
以这个数据集为例,其中定期记录各种产品的库存数量。2018年的存量不应该是当年的存量记录的总和,而是当年最新的一个。
Time,Product,Stock
2017-11-01, Oranges, 40000
2017-11-01, Apples, 120000
2017-12-01, Oranges, 42000
2017-12-01, Apples, 110000
2018-01-01, Oranges, 50000
2018-01-01, Apples, 100000
2018-02-01, Oranges, 48000
2018-02-01, Apples, 130000
2018-03-01, Oranges, 46000
2018-03-01, Apples, 120000
解决方案
问题是库存数量记录(或一般的头寸)不会随着时间的推移而聚合。但数量变化确实如此。因此,您可以做的是使用 Spark 计算库存数量变化,这将在一个多维数据集中一致地求和,然后从您的 OLAP 前端创建一个计算度量,该度量将(重新)计算多维数据集中任何位置的实际库存。
在 Spark 中加载数据集并计算每个产品的库存变化
import org.apache.spark.sql.expressions.Window
var ds = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/path/to/stock.csv")
val ws = Window.partitionBy("Product").orderBy("Time")
val ds2 = ds
.withColumn("PreviousStockTmp", lag(col("Stock"), 1).over(ws))
.withColumn("PreviousStock", when($"PreviousStockTmp".isNull, 0).otherwise($"PreviousStockTmp"))
.drop("PreviousStockTmp")
.withColumn("StockVariation", col("Stock").minus(col("PreviousStock"))).orderBy("Time")
添加对在线分析有用的通常的“年”、“月”、“日”等列
val ds3 = ds2
.withColumn("Year", year(col("Time")))
.withColumn("Month", month(col("Time")))
.withColumn("Day", dayofmonth(col("Time")))
最后使用 Sparkube 将数据集发布为立方体
import com.activeviam.sparkube._
new Sparkube().fromDataset(ds3).expose()
现在您可以在 Excel、Tableau 或 ActiveUI 中浏览多维数据集,您可以在图表和数据透视表中使用“StockVariation.SUM”度量。在 MDX 中,您可以创建一个计算度量来从变化中计算库存:
WITH
Member [Measures].[Stock] AS (
(
[Measures].[Stock],
[Year].CurrentMember.PrevMember
) + [Measures].[StockVariation.SUM]
)
SELECT
NON EMPTY Crossjoin(
[Year].[Year].[Year].Members,
{
[Measures].[Stock]
}
) ON COLUMNS,
NON EMPTY [Product].[Product].[Product].Members ON ROWS
FROM [_sparkube_1]
推荐阅读
- splunk - 使用 splunk 发现蛮力攻击
- python - 为什么 PyroPPL 中的 `Distribution` 类有 `log_prob()` 方法但没有 `prob()` 方法?
- linux - 与 GNU Make 链接时参数列表太长
- node.js - 转换 fastify-static 服务文件的响应
- java - 我们如何在 Java 中对二维数组的按行升序和按列降序进行排序?
- github - 列表中的 GitHub Markdown 链接无法正确显示
- python - 应更改 Python 日志文件的位置
- python - 按多个对象分组的功能
- javascript - React Chart Js 类组件
- c# - 将值从字典对象写入文本文件仅写入最后一个键值对