apache-spark - Spark SQL 中使用的嵌套 java bean
问题描述
我正在使用 Spark 2.1,并且想将 Person 列表编写为数据框。Person 类有一个嵌套的 java bean 类地址
人:
public class Person {
private String name;
private Address address;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
}
地址:
public class Address {
private String city;
private String street;
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
}
我正在使用以下代码针对 List[Person] 创建数据框
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
public class PersonTest {
public static void main(String[] args) {
Person p = new Person();
p.setName("Tom");
Address address = new Address();
address.setCity("C");
address.setStreet("001");
p.setAddress(address);
List<Person> persons = new ArrayList<Person>();
persons.add(p);
SparkSession session = SparkSession.builder().master("local").appName("abc").enableHiveSupport().getOrCreate();
Dataset<Row> df = session.createDataFrame(persons, Person.class);
df.printSchema();
df.write().json("file:///D:/applog/spark/" + System.currentTimeMillis());
}
}
但是出现如下异常,请问如何解决这个问题。
Exception in thread "main" scala.MatchError: com.Address@1e5eb20a (of class com..Address)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1113)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1113)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1113)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1111)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.toStream(Iterator.scala:1322)
at scala.collection.AbstractIterator.toStream(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
at scala.collection.AbstractIterator.toSeq(Iterator.scala:1336)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:380)
解决方案
您可以创建一个类型化的数据集,然后在需要时将其转换为数据框:
Dataset<Person> ds = session.createDataset(persons, Encoders.bean(Person.class));
Dataset<Row> df = ds.toDF();
推荐阅读
- perl - 如果使用 -e eval 开关,则强制 Perl 停止对命令行参数的特殊处理
- reactjs - 如何使用 React 在条件中调用 API
- javascript - 如何从引导程序 V3.2.0 升级到 V5.1.2 -(按钮折叠不起作用)
- google-chrome - AG-Grid 社区:粘贴事件未到达封闭元素 (Chrome)
- python - 在两个 DataFrame 列上运行 Python 函数
- django - /api/ 处的 AssertionError 需要一个“日期”,但得到一个“日期时间”。拒绝强制,因为这可能意味着丢失时区信息
- google-cloud-functions - 将文件从 Azure blob 存储移动到 Google 云存储桶
- image - 使用拉普拉斯金字塔与高斯金字塔的优点/缺点是什么
- python - 打印特定类型的金字塔
- python - 字符串未正确附加到列表