java - 如何在 Spark java 数据集中使用 groupByKey,然后沿着聚合执行自定义逻辑?
问题描述
我刚刚开始学习 Spark 并使用 Spark Java 来满足特定要求。我有以下格式的数据集
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1| field2| field3| field4| field5| field6|field7| field8| BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
| 1| ABC|1234|385a3d24e| 3913647| 751923| 191|9977908|321799809| 1334|
| 1| DFC|385a3d24e|3913637| 40010625|751923| 357.0| 9877908|321799841| 1332|
| 1| SDC|385a3d24e|3913637|399787631|751923| 245.0| 363908|321799835| 1332|
| 1| GFF|385a3d24e|3913637|399146918|751923| 275.0| 6977908|321799809| 1334|
| 1| GFF|385a3d24e|3913637|399146918|751923| 275.0| 7975908|321799809| 1335|
我想按 BoxId 分组并保存每个组索引的 df
就像 Boxid 321799809 一样,数据框将是
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1| field2| field3| field4| field5| field6|field7| field8| BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
| 1| ABC|1234|385a3d24e| 3913647| 751923| 191|9977908|321799809| 1334|
| 1| GFF|385a3d24e|3913637|399146918|751923| 275.0| 6977908|321799809| 1334|
| 1| GFF|385a3d24e|3913637|399146918|751923| 275.0| 7975908|321799809| 1335|
并且文件需要保存为 321799809/1334.csv (此 csv 将有两行), 321799809/1335.csv (这将只有一行)
对于 Boxid 321799841,数据框将是
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
|field1| field2| field3| field4| field5| field6|field7| field8| BoxId|Index|
+------+------------------+--------------------+----------+----------+-----------+------+--------+---------+---------+
| 1| DFC|385a3d24e|3913637| 40010625|751923| 357.0| 9877908|321799841| 1332|
并且文件需要保存为 321799841/1332.csv。
为此,我正在考虑编写一个自定义函数,因为保存文件逻辑是自定义的,如下所示
这个想法来自于阅读 Spark 数据集中的 groupByKey,沿着这个线程的聚合执行自定义逻辑
Sample custom function to write csv
writeCsv(df){
if(!file.exist(df.col(Index)))
csvWrite(df); // where csv fields are filled.
else{
append row
}
then use
df.groupByKey(t=>t.BoxId).mapGroups((df)=> writeCsv(df));
但是 groupByKey 的 java 语法需要两个参数,即我找不到任何示例的函数和编码器。
我试图做的是创建一个带有 POJO 的编码器,其中 pojo 类包含上述数据帧的字段。
Encoder<POJO> pojoEncoder = Encoders.bean(POJO.class);
df.as(probeEncoder).groupByKey(t->{return t.BoxId;}, pojoEncoder).mapGroups(); but it gives error at groupByKey()
任何人都可以帮我一些 groupByKey 的 java 例子吗?在爪哇。
解决方案
推荐阅读
- jenkins - 在我的 Jenkins 声明式管道中的多个代理上运行相同的任务
- javascript - 我用 GIMP 和 mtPaint 调整了大小,它不起作用。当我把它放幻灯片时,图像大小不会改变
- php - PHP服务器向swift客户端发送json响应显示此错误
- c# - 在 xamarin.forms 中创建和覆盖图像
- php - 在 PHP 中使用简单的依赖注入
- c - 用花括号定义
- javascript - 使用 Contact Form 7 Wordpress 插件中的 javascript 在提交时将隐藏字段的值更改为复选框的值
- json - 如何忽略配置单元中的顶点失败错误
- sql - 如何合并两个表以覆盖空值并按时间戳排序?
- c++ - 将 QPushButton::clicked 信号连接到插槽时出错