java - Apache Spark Group By(获取组中的第一个和最后一个值)
问题描述
我在学校云上的 VM 集群上运行 hadoop(老实说,不知道具体细节)。我正在使用 apache spark 与 hadoop 对话并运行我当前的代码。
我一直在尝试对我的数据执行一些聚合,以找到一个小时/天/月的总消耗值(数据中的 ENERGY_READING 列)
CONSUMPTION.tsv已经完成了一些操作
+--------+-------------------+----+--------------+
|HOUSE_ID|CONDATE |HOUR|ENERGY_READING|
+--------+-------------------+----+--------------+
|9 |2015-05-30 00:00:00|0 |11000.001444 |
|9 |2015-05-30 00:00:10|0 |11000.002888 |
|9 |2015-05-30 00:00:20|0 |11000.004332 |
|9 |2015-05-30 00:00:30|0 |11000.005776 |
|9 |2015-05-30 00:00:40|0 |11000.00722 |
|9 |2015-05-30 00:00:50|0 |11000.008664 |
|9 |2015-05-30 00:01:00|0 |11000.010108 |
|9 |2015-05-30 00:01:10|0 |11000.011552 |
|9 |2015-05-30 00:01:20|0 |11000.012996 |
|9 |2015-05-30 00:01:30|0 |11000.01444 |
|9 |2015-05-30 00:01:40|0 |11000.015884 |
|9 |2015-05-30 00:01:50|0 |11000.017328 |
|9 |2015-05-30 00:02:00|0 |11000.018772 |
|9 |2015-05-30 00:02:10|0 |11000.020216 |
|9 |2015-05-30 00:02:20|0 |11000.02166 |
|9 |2015-05-30 00:02:30|0 |11000.023104 |
|9 |2015-05-30 00:02:40|0 |11000.024548 |
|9 |2015-05-30 00:02:50|0 |11000.025992 |
|9 |2015-05-30 00:03:00|0 |11000.027436 |
|9 |2015-05-30 00:03:10|0 |11000.02888 |
+--------+-------------------+----+--------------+
Java 类
StructType schema = new StructType()
.add("LOG_ID",IntegerType)
.add("HOUSE_ID", IntegerType)
.add("CONDATE", StringType)
.add("ENERGY_READING", DoubleType)
.add("FLAG", IntegerType);
Dataset<Row> data = spark.read()
.option("header", true)
.option("delimiter", "\t")
.option("mode","DROPMALFORMED")
.schema(schema)
.csv("hdfs://hd-master:9820/CONSUMPTION.tsv");
data = data.withColumn("CONDATE", functions.to_timestamp(functions.col("CONDATE"),"yy-MM-dd HH:mm:ss.SSSSSSSSS").cast(TimestampType));
data = data.withColumn("HOUR", functions.hour(functions.col("CONDATE")));
Dataset<Row> df = data.select("HOUSE_ID","CONDATE","HOUR","ENERGY_READING");
因此,我拥有的数据每 10 秒递增一次。我想获取每个小时/天/月的第一个和最后一个值。
本质上,我想要的是 11000.001444 日的第一个值,在这种情况下,最后一个值可以说是 11000.01444。然后从第一个中减去第二个,以获得该小时/天/月的总消耗量。
这会给我一个输出
HOUSE_ID CONDATE HOUR ENERGY_READING
9 15-05-30 0 0.013
9 15-05-30 1 ...
解决方案
下面的代码将按分钟分组并计算该分钟的消耗:
import org.apache.spark.sql.expressions.Window
Dataset<Row> df2 = df.groupBy(
functions.col("HOUSE_ID"),
functions.minute(col("CONDATE")).alias("minute")
).agg(
functions.min("ENERGY_READING").alias("ENERGY_READING")
).withColumn(
"LAG_ENERGY_READING",
functions.lag(functions.col("ENERGY_READING"), 1).over(Window.partitionBy("HOUSE_ID").orderBy("minute"))
).withColumn(
"consumption",
functions.expr("ENERGY_READING - LAG_ENERGY_READING")
)
推荐阅读
- visual-studio-code - VSCode:显示默认设置
- python - 如何将 wkt 形式的多边形列表转换为多多边形?
- mergesort - 有人可以帮助理解归并排序算法吗?
- c# - 生成 Excel 文件,然后将数据输入到单元格中
- c# - 试图在 Chrome 中查看 DOM。无法找到接收 SuperSocket 错误
- java - (使用了无效的访问令牌)尝试获取 facebook test_user 的测试页面时
- python - 将具有日期时间值的列转换为每周的列
- laravel - 如何统计目录中的大量文件
- python - 如何根据另一个列值格式化散景数据表单元格,其中数据源中的列值包含空格?
- reactjs - React API 上下文路由器