java - 将 spark 行转换为嵌套的 java 对象
问题描述
我是 spark 新手,并试图将文本文件转换为 java 对象。我被困在了如何将多行转换为单个 java 对象的想法中。
我正在阅读下面的示例文件,用于我的学习。在第一列下面的文件中是 personId。如果同一个人有多个电话和/或多个地址,则 personId 可以重复多行。
98480|PERSON|TOM|GREER|1982|12|27
98480|PHONE|CELL|732|201|6789
98480|PHONE|HOME|732|123|9876
98480|ADDR|RES|102|JFK BLVD|PISCATAWAY|NJ|08854
98480|ADDR|OFF|211|EXCHANGE PL|JERSEY CITY|NJ|07302
98481|PERSON|LIN|JASSOY|1976|09|15
98481|PHONE|CELL|908|398|3389
98481|PHONE|HOME|917|363|2647
98481|ADDR|RES|111|JOURNAL SQ|JERSEY CITY|NJ|07704
98481|ADDR|OFF|365|DOWNTOWN NEWYORK|NEWYORK CITY|NY|10001
我正在将上面的文件转换为下面的 3 个数据框
JavaRDD<Row> personRDD = textRDD.map((String s) -> s.split("\\|"))
.filter((a) -> a[1].equalsIgnoreCase("PERSON")).map((v1) -> convertToString(v1))
.map(str -> RowFactory.create(str.split("\\|")));
Dataset<Row> personRow = session.createDataFrame(personRDD,
new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
.add("firstName", DataTypes.StringType).add("lastName", DataTypes.StringType)
.add("year", DataTypes.StringType).add("month", DataTypes.StringType)
.add("day", DataTypes.StringType));
JavaRDD<Row> phoneRDD = textRDD.map((String s) -> s.split("\\|")).filter((a) -> a[1].equalsIgnoreCase("PHONE"))
.map((v1) -> convertToString(v1)).map(str -> RowFactory.create(str.split("\\|")));
Dataset<Row> phoneRow = session.createDataFrame(phoneRDD,
new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
.add("phoneType", DataTypes.StringType).add("areaCode", DataTypes.StringType)
.add("phoneMiddle", DataTypes.StringType).add("ext", DataTypes.StringType));
JavaRDD<Row> addrRDD = textRDD.map((String s) -> s.split("\\|")).filter((a) -> a[1].equalsIgnoreCase("ADDR"))
.map((v1) -> convertToString(v1)).map(str -> RowFactory.create(str.split("\\|")));
Dataset<Row> addressRow = session.createDataFrame(addrRDD,
new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
.add("addrType", DataTypes.StringType).add("addr1", DataTypes.StringType)
.add("addr2", DataTypes.StringType).add("city", DataTypes.StringType)
.add("state", DataTypes.StringType).add("zipCode", DataTypes.StringType));
现在我有 3 个数据框,我根据 personid 加入其中。
Dataset<Row> joinedRow = personRow.join(phoneRow, personRow.col("personId").equalTo(phoneRow.col("personId")))
.join(addressRow, personRow.col("personId").equalTo(addressRow.col("personId")));
输出看起来像这样
+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+
|personId| type|firstName|lastName|year|month|day|personId| type|phoneType|areaCode|phoneMiddle| ext|personId|type|addrType|addr1| addr2| city|state|zipCode|
+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+
| 98481|PERSON| LIN| JASSOY|1976| 09| 15| 98481|PHONE| CELL| 908| 398|3389| 98481|ADDR| RES| 111| JOURNAL SQ| JERSEY CITY| NJ| 07704|
| 98481|PERSON| LIN| JASSOY|1976| 09| 15| 98481|PHONE| CELL| 908| 398|3389| 98481|ADDR| OFF| 365|DOWNTOWN NEWYORK|NEWYORK CITY| NY| 10001|
| 98481|PERSON| LIN| JASSOY|1976| 09| 15| 98481|PHONE| HOME| 917| 363|2647| 98481|ADDR| RES| 111| JOURNAL SQ| JERSEY CITY| NJ| 07704|
| 98481|PERSON| LIN| JASSOY|1976| 09| 15| 98481|PHONE| HOME| 917| 363|2647| 98481|ADDR| OFF| 365|DOWNTOWN NEWYORK|NEWYORK CITY| NY| 10001|
| 98480|PERSON| TOM| GREER|1982| 12| 27| 98480|PHONE| CELL| 732| 201|6789| 98480|ADDR| RES| 102| JFK BLVD| PISCATAWAY| NJ| 08854|
| 98480|PERSON| TOM| GREER|1982| 12| 27| 98480|PHONE| CELL| 732| 201|6789| 98480|ADDR| OFF| 211| EXCHANGE PL| JERSEY CITY| NJ| 07302|
| 98480|PERSON| TOM| GREER|1982| 12| 27| 98480|PHONE| HOME| 732| 123|9876| 98480|ADDR| RES| 102| JFK BLVD| PISCATAWAY| NJ| 08854|
| 98480|PERSON| TOM| GREER|1982| 12| 27| 98480|PHONE| HOME| 732| 123|9876| 98480|ADDR| OFF| 211| EXCHANGE PL| JERSEY CITY| NJ| 07302|
+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+
我正在尝试将其转换为 java 对象,但没有取得多大成功。
我的 pojos 看起来像这样。
public class Person implements Serializable
{
private String firstName;
private String lastName;
private String dateOfBirth;
private List<Address> addresses = null;
private List<Phone> phones = null;
......
public class Phone implements Serializable
{
private String phoneNumber;
public class Address implements Serializable
{
private String addr1;
private String addr2;
private String city;
private String zipCode;
....
关于我该怎么做的任何想法
解决方案
推荐阅读
- mysql - 如何在节点 js 中重新启动 mqtt 订阅者
- java - 在标记中显示信息
- hadoop - 如何将 Hadoop 版本从 3.2.1 转换为 2.2.0?
- google-analytics - 为什么onclick() gtag事件在其他事件都ok时没有出现在GA中
- r - r中的多维缩放与语料库
- angular - 对“环境”“环境”中的非导出函数的引用包含环境/环境.ts(29,10) 处的错误
- class - 是否有任何策略来划分应用 SRP 的责任?
- android - 过去 30 分钟无法在 Firebase 上检测到用户事件/活动
- reactjs - Ract JS:React JS 模态出现 W/O CSS
- java - jgit commit 抛出 PGP 异常