首页 > 解决方案 > 使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件

问题描述

我有一个 .csv 文件,其中包含 id 列和几个字符串列。我想按 id 分组,然后将 string_column1 中的所有值写入文本文件(每个值都在新行上)。最后,我希望文本文件的名称为“allstrings”+id。我正在使用带有 Java 的 Apache Spark。

我尝试使用 groupBy("id").agg(collect_list("string_column1")) 但我得到“方法 collect_list(String) 对于 Main 类型未定义”。
我不知道如何使用 id 列中的不同值来命名文本文件。

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Main {

    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);

        SparkSession spark = SparkSession.builder()
                .appName("testingSql")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> dataset = spark.read()
        .option("header", true)
        .csv("src/main/resources/maininput.csv");

        // make a separate .csv file for each group of strings (grouped by id),
        // with each string on a new line
        // and the name of the file should be "allstrings"+id
        RelationalGroupedDataset result = dataset.groupBy("id")
                .agg(collect_list("string_column1"))
                .?????????;



        spark.close();
    }

}

标签: javaapache-spark

解决方案


您可以在写入时对数据进行分区,它将为每个组创建单独的目录,id 每个文件夹的名称将采用column_name = value格式。

df.write.partitionBy("id").csv("output_directory")

然后,您可以使用org.apache.hadoop.fs._重命名每个组目录中的文件。


推荐阅读