首页 > 解决方案 > 如何使用 Apache Spark 和 Sparkube 实施“最后一个非空”措施?

问题描述

我们结合使用 Apache Spark 和 Sparkube 来动态创建在线分析环境。数据在 Spark 中准备,并使用 Sparkube 作为多维立方体公开。Sparkube 自动发布简单的聚合(SUM、MIN、MAX、AVG、STD...),但我们如何支持“最后一个非空”类型的聚合呢?

以这个数据集为例,其中定期记录各种产品的库存数量。2018年的存量不应该是当年的存量记录的总和,而是当年最新的一个。

Time,Product,Stock
2017-11-01, Oranges, 40000
2017-11-01, Apples, 120000
2017-12-01, Oranges, 42000
2017-12-01, Apples, 110000
2018-01-01, Oranges, 50000
2018-01-01, Apples, 100000
2018-02-01, Oranges, 48000
2018-02-01, Apples, 130000
2018-03-01, Oranges, 46000
2018-03-01, Apples, 120000

标签: apache-sparkolapolap-cube

解决方案


问题是库存数量记录(或一般的头寸)不会随着时间的推移而聚合。但数量变化确实如此。因此,您可以做的是使用 Spark 计算库存数量变化,这将在一个多维数据集中一致地求和,然后从您的 OLAP 前端创建一个计算度量,该度量将(重新)计算多维数据集中任何位置的实际库存。

在 Spark 中加载数据集并计算每个产品的库存变化

import org.apache.spark.sql.expressions.Window

var ds = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/path/to/stock.csv")

val ws = Window.partitionBy("Product").orderBy("Time")

val ds2 = ds
  .withColumn("PreviousStockTmp", lag(col("Stock"), 1).over(ws))
  .withColumn("PreviousStock", when($"PreviousStockTmp".isNull, 0).otherwise($"PreviousStockTmp"))
  .drop("PreviousStockTmp")
  .withColumn("StockVariation", col("Stock").minus(col("PreviousStock"))).orderBy("Time")

添加对在线分析有用的通常的“年”、“月”、“日”等列

val ds3 = ds2
  .withColumn("Year", year(col("Time")))
  .withColumn("Month", month(col("Time")))
  .withColumn("Day", dayofmonth(col("Time")))

最后使用 Sparkube 将数据集发布为立方体

import com.activeviam.sparkube._
new Sparkube().fromDataset(ds3).expose()

现在您可以在 Excel、Tableau 或 ActiveUI 中浏览多维数据集,您可以在图表和数据透视表中使用“StockVariation.SUM”度量。在 MDX 中,您可以创建一个计算度量来从变化中计算库存:

WITH
 Member [Measures].[Stock] AS (
  (
    [Measures].[Stock],
    [Year].CurrentMember.PrevMember
  ) + [Measures].[StockVariation.SUM]
) 
SELECT
  NON EMPTY Crossjoin(
    [Year].[Year].[Year].Members,
    {
      [Measures].[Stock]
    }
  ) ON COLUMNS,
  NON EMPTY [Product].[Product].[Product].Members ON ROWS
  FROM [_sparkube_1]

在此处输入图像描述


推荐阅读