首页 > 解决方案 > Spark Dataset Foreach 函数不迭代

问题描述

语境

我想迭代一个Spark 数据集并为每一行更新一个 HashMap。

这是我的代码:

// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns
// - my_dataset.count() == 300 000
// - my_dataset.columns().length == 10

// Declare my HashMap
HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>();

// Initialize the map
for(String col : my_dataset.columns())
{
    my_map.put(col, new Vector<String>());
}

// Iterate over the dataset and update the map
my_dataset.foreach( (ForeachFunction<Row>) row -> {
    for(String col : my_map.KeySet())
    {
        my_map.get(col).add(row.get(row.fieldIndex(col)).toString());
    }
});

问题

我的问题是 foreach 根本不迭代,lambda 永远不会执行,我不知道为什么。
我按照此处的说明实现了它:如何在 Spark Java 中遍历/迭代数据集?

最后,所有内部向量保持为空(因为它们被初始化),尽管数据集不是(查看给定代码示例中的第一条注释)。

我知道 foreach 永远不会迭代,因为我做了两个测试:

我不使用Java(更不用说Java lambdas)所以也许我错过了一个重要的点,但我找不到什么。

标签: javaapache-sparklambdaforeachapache-spark-dataset

解决方案


我可能有点老派,但我从不太喜欢 lambdas,因为它会变得非常复杂。

这是 a 的完整示例foreach()

package net.jgp.labs.spark.l240_foreach.l000;

import java.io.Serializable;

import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class ForEachBookApp implements Serializable {
  private static final long serialVersionUID = -4250231621481140775L;

  private final class BookPrinter implements ForeachFunction<Row> {
    private static final long serialVersionUID = -3680381094052442862L;

    @Override
    public void call(Row r) throws Exception {
      System.out.println(r.getString(2) + " can be bought at " + r.getString(
          4));
    }
  }

  public static void main(String[] args) {
    ForEachBookApp app = new ForEachBookApp();
    app.start();
  }

  private void start() {
    SparkSession spark = SparkSession.builder().appName("For Each Book").master(
        "local").getOrCreate();

    String filename = "data/books.csv";
    Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
        .option("header", "true")
        .load(filename);
    df.show();

    df.foreach(new BookPrinter());
  }
}

如您所见,此示例读取一个 CSV 文件并从数据中打印一条消息。这相当简单。

foreach()实例化一个新类,工作完成的地方。

df.foreach(new BookPrinter());

工作是在call()类的方法中完成的:

  private final class BookPrinter implements ForeachFunction<Row> {

    @Override
    public void call(Row r) throws Exception {
...
    }
  }

由于您是 Java 新手,请确保您拥有正确的签名(用于类和方法)和正确的导入。

您还可以从https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000克隆示例。这应该可以帮助您foreach()


推荐阅读