首页 > 解决方案 > Java 项目中的 MongoDB + Spark。无法使用数据集

问题描述

我正在使用 Java 中的 Mongo 和 Spark 开发一个项目。在我工作的这个应用程序中,我只想从 mongo 的数据库中读取数据并将其存储在数据集中以使用它。

但是我遇到了一些麻烦,因为我无法使用可以尝试存储信息的数据集。

这是我的代码:

import java.util.logging.Level;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

public class MongoSparkTFG {

    public void mongoSparkExe() {

    SparkSession spark = SparkSession.builder().master("local").appName("MongoSparkConnector")
            .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/Nasa.satelites")
            .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/Nasa.satelites").getOrCreate();
    spark.sparkContext().setLogLevel("ERROR");

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    System.out.println("Comienzo el proceso de ejemplo");

    /* Start Example: Read data from MongoDB ************************/

    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
    implicitDS.printSchema();
    implicitDS.createOrReplaceTempView("Sats");
    Dataset<Row> centenarians = spark.sql("SELECT Tiempo FROM Sats WHERE latitud > 9");
    centenarians.show();

    /* End Example **************************************************/

    jsc.close();

}

public static void main(String[] args) {
    MongoSparkTFG xd = new MongoSparkTFG();
    xd.mongoSparkExe();
}

}

这些是我在 pom.xml 中的依赖项:

<properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
</properties>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
    </dependency>

    <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> 
        <version>2.0.0</version> </dependency> -->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
        <version>2.3.2</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.8.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.directory.server</groupId>
        <artifactId>kerberos-client</artifactId>
        <version>2.0.0-M16</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-yarn-server-web-proxy</artifactId>
        <version>3.1.1</version>
    </dependency>


    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>

</dependencies>

这是我所拥有的shema:

 |-- Num: integer (nullable = true)
 |-- Tiempo: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)
 |-- wind_Speed: double (nullable = true)
 |-- wind_dir: double (nullable = true)

当我尝试显示数据集时遇到的第一个错误:

   2018-10-01 04:18:44,863 ERROR [Executor task launch worker for task 1] executor.Executor (Logging.scala:logError(91)) - Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NoSuchFieldError: DECIMAL128

当我对此数据集执行 createOrReplaceTempView 并在此 TempView 上执行 Select 时,出现此错误:

 2018-10-01 04:22:49,978 ERROR [main] rdd.MongoRDD (Logging.scala:logError(70)) - 
-----------------------------
WARNING: Partitioning failed.
-----------------------------

Partitioning using the 'DefaultMongoPartitioner$' failed.

Please check the stacktrace to determine the cause of the failure or check the Partitioner API documentation.
Note: Not all partitioners are suitable for all toplogies and not all partitioners support views.%n

我究竟做错了什么?

谢谢您的帮助。问候。

标签: mongodbapache-sparkapache-spark-sqlapache-spark-datasetmongo-java-driver

解决方案


推荐阅读