首页 > 解决方案 > Java Apache Spark 将 TSV 格式转换为 JavaRDD

问题描述

我需要实现一个 Java Spark 程序来计算给定索引处具有相同列值的元组。其中命令行参数是[Input path] [column index] [output path]。输入是一个 TSV 文件,格式为:注册(入学编号、姓氏、名字、讲座、学期)

1234    Graph   Polly   Big Data    WiSe15
5678    Conda   Anna    Big Data    WiSe16
9012    Jeego   Hugh    Big Data    WiSe16
1234    Graph   Polly   Data Mining WiSe16
3456    Downe   Sid     Data Mining WiSe16
我已经开始使用配置实施设置,但在其余的任务中迷路了。
package bigdata;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;

public class RelCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RelCount");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD allRows = sc.textFile("file");
        JavaRDD line = allRows.map(l->Arrays.asList(l.toString().split("\t")));
    }
}

程序的输出应该是这种形式:

(Big Data, 3)
(Data Mining, 2)

谢谢你的帮助 :)

标签: javaapache-spark

解决方案


tsv 是一个以制表符作为分隔符的 csv 文件,因此最简单的方法是使用 Dataframe API 的csv reader读取文件。如果需要,可以稍后将数据帧转换为 rdd。

首先,获取 Spark 会话:

SparkSession spark = SparkSession.builder()
    .master("local[*]")
    .appName("SparkTest")
    .getOrCreate();

现在可以读取文件:

Dataset<Row> df = spark.
    read().
    option("delimiter", "\t").
    option("header", false).
    csv(<path to file>);

由于 csv 阅读器负责格式化业务,因此不再需要手动拆分行。

在下一步中,提取列名。由于 reader 选项header设置为false,列名将是通用名称,如_c0, _c1, ... 在此示例中,我们按第四列(基于 0 索引)分组,因此我们选择此列名。

int index = 3;
String columnname = df.schema().fieldNames()[index];

作为最后一步,我们按所选列对数据框进行分组并计算每组的行数:

df.groupBy(columnname)
    .count()
    .show();

输出是:

+-----------+-----+
|        _c3|count|
+-----------+-----+
|Data Mining|    2|
|   Big Data|    3|
+-----------+-----+

如果需要,也可以将结果转换为 rdd:

JavaRDD<Row> rdd = df.groupBy(columnname)
    .count()
    .toJavaRDD();

但通常 dataframe API 比rdd API方便得多。


推荐阅读