java - Java Apache Spark 将 TSV 格式转换为 JavaRDD
问题描述
我需要实现一个 Java Spark 程序来计算给定索引处具有相同列值的元组。其中命令行参数是[Input path] [column index] [output path]。输入是一个 TSV 文件,格式为:注册(入学编号、姓氏、名字、讲座、学期)。
1234 Graph Polly Big Data WiSe15
5678 Conda Anna Big Data WiSe16
9012 Jeego Hugh Big Data WiSe16
1234 Graph Polly Data Mining WiSe16
3456 Downe Sid Data Mining WiSe16
package bigdata;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
public class RelCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RelCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD allRows = sc.textFile("file");
JavaRDD line = allRows.map(l->Arrays.asList(l.toString().split("\t")));
}
}
程序的输出应该是这种形式:
(Big Data, 3)
(Data Mining, 2)
谢谢你的帮助 :)
解决方案
tsv 是一个以制表符作为分隔符的 csv 文件,因此最简单的方法是使用 Dataframe API 的csv reader读取文件。如果需要,可以稍后将数据帧转换为 rdd。
首先,获取 Spark 会话:
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("SparkTest")
.getOrCreate();
现在可以读取文件:
Dataset<Row> df = spark.
read().
option("delimiter", "\t").
option("header", false).
csv(<path to file>);
由于 csv 阅读器负责格式化业务,因此不再需要手动拆分行。
在下一步中,提取列名。由于 reader 选项header
设置为false
,列名将是通用名称,如_c0
, _c1
, ... 在此示例中,我们按第四列(基于 0 索引)分组,因此我们选择此列名。
int index = 3;
String columnname = df.schema().fieldNames()[index];
作为最后一步,我们按所选列对数据框进行分组并计算每组的行数:
df.groupBy(columnname)
.count()
.show();
输出是:
+-----------+-----+
| _c3|count|
+-----------+-----+
|Data Mining| 2|
| Big Data| 3|
+-----------+-----+
如果需要,也可以将结果转换为 rdd:
JavaRDD<Row> rdd = df.groupBy(columnname)
.count()
.toJavaRDD();
但通常 dataframe API 比rdd API方便得多。
推荐阅读
- python - Python - 拆分由空格和逗号分隔的名称字符串
- api - 如何将邮递员查询参数设置为今天的日期?
- oracle-data-integrator - ODI 12c - 逆向工程 txt - 换行问题
- css - Plotly Annotate 多个箱线图
- linux - 使用 bash 从标准输入读取二进制文件
- excel - 如何自动编号直到检测到合并单元格?
- python - 请求从 ASP 更改语言
- android - 我应该在将用户密码发送到服务器之前对其进行加密吗?
- android - 在一个 XAML 元素中从 2 个 ViewModel 绑定
- elasticsearch - 嵌套类型的 Elasticsearch 布尔查询中的模型“或 TRUE”?