首页 > 解决方案 > 未解决的编译问题:Dataset 类型的方法映射不明确

问题描述

我尝试制作 apache spark 3.0.1 的 java 客户端代码。首先是 pom.xml 代码。

<dependencies>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.2</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-csv</artifactId>
        <version>2.11.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.7.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
        <version>2.12.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.12.7</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
  </dependencies>

我使用 spark 结构化流 api 制作 java 客户端代码

SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongo_StrctStream").getOrCreate();
        
Dataset<Row> inputDF = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topicForMongoDB").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)");
        
Encoder<Document> mongoEncode = Encoders.bean(Document.class);
Dataset<Row> tempDF = inputDF.map(row -> {    //map function throws the exception.
        String[] parameters = new String[row.mkString().split(",").length];
            
        CsvMapper csvMapper = new CsvMapper();
        parameters = csvMapper.readValue(row.mkString(), String[].class);
            
        DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;
        EntityMongoDB data = new EntityMongoDB();//LocalDate.parse(parameters[2], formatter), Float.valueOf(parameters[3]), parameters[4], parameters[5], parameters[6], parameters[7], parameters[8], parameters[9]);
            
        String jsonInString = csvMapper.writeValueAsString(data);

        Document doc = new Document(Document.parse(jsonInString));      
        return doc;
    }, mongoEncode).toDF();

但是由于以下异常,这些代码无法运行,

Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
    The method map(Function1<Row,Document>, Encoder<Document>) is ambiguous for the type Dataset<Row>

我在这些代码上看不到任何错误,因为这些代码在 apache spark 2.4 版本上没有异常。这些未解决的编译异常是从apache spark版本带来的吗?请告诉我如何解决这个问题。

= 更新 =

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import com.aaa.etl.pojo.EntityMongoDB;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;

供您参考,我还附上了 EntityMongoDB 类源,

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EntityMongoDB implements Serializable {

    @JsonFormat(pattern="yyyy-MM-dd")
    @JsonDeserialize(using = LocalDateDeserializer.class)
    private LocalDate date;
    
    private float value;
    
    private String id;
    
    private String title;
    
    private String state;
    
    private String frequency_short;
    
    private String units_short;
    
    private String seasonal_adjustment_short;
}

标签: javaapache-sparkspark-structured-streaming

解决方案


推荐阅读