java - 使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件
问题描述
我有一个 .csv 文件,其中包含 id 列和几个字符串列。我想按 id 分组,然后将 string_column1 中的所有值写入文本文件(每个值都在新行上)。最后,我希望文本文件的名称为“allstrings”+id。我正在使用带有 Java 的 Apache Spark。
我尝试使用 groupBy("id").agg(collect_list("string_column1")) 但我得到“方法 collect_list(String) 对于 Main 类型未定义”。
我不知道如何使用 id 列中的不同值来命名文本文件。
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
SparkSession spark = SparkSession.builder()
.appName("testingSql")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark.read()
.option("header", true)
.csv("src/main/resources/maininput.csv");
// make a separate .csv file for each group of strings (grouped by id),
// with each string on a new line
// and the name of the file should be "allstrings"+id
RelationalGroupedDataset result = dataset.groupBy("id")
.agg(collect_list("string_column1"))
.?????????;
spark.close();
}
}
解决方案
您可以在写入时对数据进行分区,它将为每个组创建单独的目录,id
每个文件夹的名称将采用column_name = value格式。
df.write.partitionBy("id").csv("output_directory")
然后,您可以使用org.apache.hadoop.fs._
重命名每个组目录中的文件。
推荐阅读
- python - 如何从 Python 中的 Pandas 数据框创建嵌套的 JSON 文件?
- c++ - c ++ tic tac toe非int字符导致无限循环
- python - 子类化和 keras 的 Python 问题
- linux - OpenGL EGL eglGetDisplay 不断返回 EGL 错误 0x3008(EGL_BAD_DISPLAY)
- python-3.x - 绘制布朗运动实现的平均长度
- r - 如何在 R 中解压这个 jsonlite 返回值?
- php - 如何在 / 之后的 url 中获取用户名?
- javascript - JavaScript 日期格式显示错误
- java - 如何使用 JAVA API 将任何嵌套的 json 对象插入到 elasticsearh
- javascript - 减速器函数被调用两次