首页 > 解决方案 > 如何在 Spark java 数据集中使用 groupByKey,然后沿着聚合执行自定义逻辑?

问题描述

我刚刚开始学习 Spark 并使用 Spark Java 来满足特定要求。我有以下格式的数据集

+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1|            field2|              field3|    field4|    field5|     field6|field7|  field8|   BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|  1|         ABC|1234|385a3d24e| 3913647|   751923| 191|9977908|321799809|   1334|
|  1|       DFC|385a3d24e|3913637| 40010625|751923| 357.0|    9877908|321799841|   1332|
|  1|        SDC|385a3d24e|3913637|399787631|751923| 245.0|    363908|321799835|   1332|
|  1|       GFF|385a3d24e|3913637|399146918|751923| 275.0|    6977908|321799809|   1334|
|  1|       GFF|385a3d24e|3913637|399146918|751923| 275.0|    7975908|321799809|   1335|

我想按 BoxId 分组并保存每个组索引的 df

就像 Boxid 321799809 一样,数据框将是

+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1|            field2|              field3|    field4|    field5|     field6|field7|  field8|   BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|  1|         ABC|1234|385a3d24e| 3913647|   751923| 191|9977908|321799809|   1334|
|  1|       GFF|385a3d24e|3913637|399146918|751923| 275.0|    6977908|321799809|   1334|
|  1|       GFF|385a3d24e|3913637|399146918|751923| 275.0|    7975908|321799809|   1335|

并且文件需要保存为 321799809/1334.csv (此 csv 将有两行), 321799809/1335.csv (这将只有一行)

对于 Boxid 321799841,数据框将是

+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1|            field2|              field3|    field4|    field5|     field6|field7|  field8|   BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|  1|       DFC|385a3d24e|3913637| 40010625|751923| 357.0|    9877908|321799841|   1332|

并且文件需要保存为 321799841/1332.csv。

为此,我正在考虑编写一个自定义函数,因为保存文件逻辑是自定义的,如下所示

这个想法来自于阅读 Spark 数据集中的 groupByKey,沿着这个线程的聚合执行自定义逻辑

Sample custom function to write csv
 
writeCsv(df){
if(!file.exist(df.col(Index)))
    csvWrite(df); // where csv fields are filled.
else{
   append row

}

then use

df.groupByKey(t=>t.BoxId).mapGroups((df)=> writeCsv(df));

但是 groupByKey 的 java 语法需要两个参数,即我找不到任何示例的函数和编码器。

我试图做的是创建一个带有 POJO 的编码器,其中 pojo 类包含上述数据帧的字段。

Encoder<POJO> pojoEncoder = Encoders.bean(POJO.class);

df.as(probeEncoder).groupByKey(t->{return t.BoxId;}, pojoEncoder).mapGroups(); but it gives error at groupByKey()

任何人都可以帮我一些 groupByKey 的 java 例子吗?在爪哇。

标签: javadataframeapache-spark

解决方案


推荐阅读