首页 > 解决方案 > 使用Java Spark逐行读取大文本文件

问题描述

我正在尝试读取一个大文本文件(2 到 3 GB)。我需要逐行读取文本文件并将每一行转换为 Json 对象。我尝试使用 .collect() 和 .toLocalIterator() 来读取文本文件。collect() 适用于小文件,但不适用于大文件。我知道 .toLocalIterator() 将分散在集群周围的数据收集到一个集群中。根据文档 .toLocalIterator() 在处理大型 RDD 时无效,因为它会遇到内存问题。有没有一种有效的方法来读取多节点集群中的大型文本文件?

下面是我尝试读取文件并将每一行转换为 json 的各种尝试的方法。

public static void jsonConversion() {
    JavaRDD<String> lines = sc.textFile(path);
    String newrows = lines.first(); //<--- This reads the first line of the text file


    // Reading through with
    // tolocaliterator--------------------------------------------
     Iterator<String> newstuff = lines.toLocalIterator();
     System.out.println("line 1 " + newstuff.next());
     System.out.println("line 2 " + newstuff.next());

    // Inserting lines in a list.
    // Note: .collect() is appropriate for small files
    // only.-------------------------
    List<String> rows = lines.collect();

    // Sets loop limit based on the number on lines in text file.
    int count = (int) lines.count();
    System.out.println("Number of lines are " + count);

    // Using google's library to create a Json builder.
    GsonBuilder gsonBuilder = new GsonBuilder();
    Gson gson = new GsonBuilder().setLenient().create();

    // Created an array list to insert json objects.
    ArrayList<String> jsonList = new ArrayList<>();

    // Converting each line of the text file into a Json formatted string and
    // inserting into the array list 'jsonList'
    for (int i = 0; i <= count - 1; i++) {
        String JSONObject = gson.toJson(rows.get(i));
        Gson prettyGson = new GsonBuilder().setPrettyPrinting().create();
        String prettyJson = prettyGson.toJson(rows.get(i));
        jsonList.add(prettyJson);
    }

    // For printing out the all the json objects
    int lineNumber = 1;
    for (int i = 0; i <= count - 1; i++) {
        System.out.println("line " + lineNumber + "-->" + jsonList.get(i));
        lineNumber++;
    }

}

下面是我正在使用的库列表

//Spark Libraries
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

//Java Libraries
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

//Json Builder Libraries
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

标签: javaapache-spark

解决方案


您可以尝试在 RDD 上使用 map 功能,而不是收集所有结果。

JavaRDD<String> lines = sc.textFile(path);
JavaRDD<String> jsonList = lines.map(line -> <<all your json transformations>>)

这样,您将实现数据的分布式转换。更多关于地图功能

将数据转换为列表或数组将强制在一个节点上进行数据收集。如果要在 Spark 中实现计算分布,则需要使用 RDD 或 Dataframe 或 Dataset。


推荐阅读