java - 未解决的编译问题:Dataset 类型的方法映射不明确|
问题描述
我尝试制作 apache spark 3.0.1 的 java 客户端代码。首先是 pom.xml 代码。
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
我使用 spark 结构化流 api 制作 java 客户端代码
SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongo_StrctStream").getOrCreate();
Dataset<Row> inputDF = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topicForMongoDB").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)");
Encoder<Document> mongoEncode = Encoders.bean(Document.class);
Dataset<Row> tempDF = inputDF.map(row -> { //map function throws the exception.
String[] parameters = new String[row.mkString().split(",").length];
CsvMapper csvMapper = new CsvMapper();
parameters = csvMapper.readValue(row.mkString(), String[].class);
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;
EntityMongoDB data = new EntityMongoDB();//LocalDate.parse(parameters[2], formatter), Float.valueOf(parameters[3]), parameters[4], parameters[5], parameters[6], parameters[7], parameters[8], parameters[9]);
String jsonInString = csvMapper.writeValueAsString(data);
Document doc = new Document(Document.parse(jsonInString));
return doc;
}, mongoEncode).toDF();
但是由于以下异常,这些代码无法运行,
Exception in thread "main" java.lang.Error: Unresolved compilation problem:
The method map(Function1<Row,Document>, Encoder<Document>) is ambiguous for the type Dataset<Row>
我在这些代码上看不到任何错误,因为这些代码在 apache spark 2.4 版本上没有异常。这些未解决的编译异常是从apache spark版本带来的吗?请告诉我如何解决这个问题。
= 更新 =
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import com.aaa.etl.pojo.EntityMongoDB;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
供您参考,我还附上了 EntityMongoDB 类源,
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EntityMongoDB implements Serializable {
@JsonFormat(pattern="yyyy-MM-dd")
@JsonDeserialize(using = LocalDateDeserializer.class)
private LocalDate date;
private float value;
private String id;
private String title;
private String state;
private String frequency_short;
private String units_short;
private String seasonal_adjustment_short;
}
解决方案
推荐阅读
- c# - 将带有 ANSI 字符的 SecureString 转换为非托管字符串
- r - 用于列内容和列名的循环
- python - 在 Python 中查找最长匹配前缀
- c++ - 我对如何从 C++ 中的另一个 CPP 类调用该方法有一些问题
- java - 即使在 mybatis 映射器中没有插入,在插入后返回 SQL server 表中最后生成的 id
- apache-kafka - 如何从特定主题中删除 Kafka 消费者组?
- java - Nexus 部署通过 maven 给出连接超时
- android - 为什么在android中不允许?使用 react-native-image-picker
- kubernetes - spec.clusterIP:无效值:“”:字段不可变
- typescript - 打字稿 - 类型“xxx []”上不存在属性