首页 > 解决方案 > 将 spark 行转换为嵌套的 java 对象

问题描述

我是 spark 新手,并试图将文本文件转换为 java 对象。我被困在了如何将多行转换为单个 java 对象的想法中。

我正在阅读下面的示例文件,用于我的学习。在第一列下面的文件中是 personId。如果同一个人有多个电话和/或多个地址,则 personId 可以重复多行。

98480|PERSON|TOM|GREER|1982|12|27
98480|PHONE|CELL|732|201|6789
98480|PHONE|HOME|732|123|9876
98480|ADDR|RES|102|JFK BLVD|PISCATAWAY|NJ|08854
98480|ADDR|OFF|211|EXCHANGE PL|JERSEY CITY|NJ|07302
98481|PERSON|LIN|JASSOY|1976|09|15
98481|PHONE|CELL|908|398|3389
98481|PHONE|HOME|917|363|2647
98481|ADDR|RES|111|JOURNAL SQ|JERSEY CITY|NJ|07704
98481|ADDR|OFF|365|DOWNTOWN NEWYORK|NEWYORK CITY|NY|10001

我正在将上面的文件转换为下面的 3 个数据框

JavaRDD<Row> personRDD = textRDD.map((String s) -> s.split("\\|"))
                .filter((a) -> a[1].equalsIgnoreCase("PERSON")).map((v1) -> convertToString(v1))
                .map(str -> RowFactory.create(str.split("\\|")));

        Dataset<Row> personRow = session.createDataFrame(personRDD,
                new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
                        .add("firstName", DataTypes.StringType).add("lastName", DataTypes.StringType)
                        .add("year", DataTypes.StringType).add("month", DataTypes.StringType)
                        .add("day", DataTypes.StringType));

        JavaRDD<Row> phoneRDD = textRDD.map((String s) -> s.split("\\|")).filter((a) -> a[1].equalsIgnoreCase("PHONE"))
                .map((v1) -> convertToString(v1)).map(str -> RowFactory.create(str.split("\\|")));

        Dataset<Row> phoneRow = session.createDataFrame(phoneRDD,
                new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
                        .add("phoneType", DataTypes.StringType).add("areaCode", DataTypes.StringType)
                        .add("phoneMiddle", DataTypes.StringType).add("ext", DataTypes.StringType));

        JavaRDD<Row> addrRDD = textRDD.map((String s) -> s.split("\\|")).filter((a) -> a[1].equalsIgnoreCase("ADDR"))
                .map((v1) -> convertToString(v1)).map(str -> RowFactory.create(str.split("\\|")));

        Dataset<Row> addressRow = session.createDataFrame(addrRDD,
                new StructType().add("personId", DataTypes.StringType).add("type", DataTypes.StringType)
                        .add("addrType", DataTypes.StringType).add("addr1", DataTypes.StringType)
                        .add("addr2", DataTypes.StringType).add("city", DataTypes.StringType)
                        .add("state", DataTypes.StringType).add("zipCode", DataTypes.StringType));

现在我有 3 个数据框,我根据 personid 加入其中。

Dataset<Row> joinedRow = personRow.join(phoneRow, personRow.col("personId").equalTo(phoneRow.col("personId")))
                .join(addressRow, personRow.col("personId").equalTo(addressRow.col("personId")));

输出看起来像这样

+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+
|personId|  type|firstName|lastName|year|month|day|personId| type|phoneType|areaCode|phoneMiddle| ext|personId|type|addrType|addr1|           addr2|        city|state|zipCode|
+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+
|   98481|PERSON|      LIN|  JASSOY|1976|   09| 15|   98481|PHONE|     CELL|     908|        398|3389|   98481|ADDR|     RES|  111|      JOURNAL SQ| JERSEY CITY|   NJ|  07704|
|   98481|PERSON|      LIN|  JASSOY|1976|   09| 15|   98481|PHONE|     CELL|     908|        398|3389|   98481|ADDR|     OFF|  365|DOWNTOWN NEWYORK|NEWYORK CITY|   NY|  10001|
|   98481|PERSON|      LIN|  JASSOY|1976|   09| 15|   98481|PHONE|     HOME|     917|        363|2647|   98481|ADDR|     RES|  111|      JOURNAL SQ| JERSEY CITY|   NJ|  07704|
|   98481|PERSON|      LIN|  JASSOY|1976|   09| 15|   98481|PHONE|     HOME|     917|        363|2647|   98481|ADDR|     OFF|  365|DOWNTOWN NEWYORK|NEWYORK CITY|   NY|  10001|
|   98480|PERSON|      TOM|   GREER|1982|   12| 27|   98480|PHONE|     CELL|     732|        201|6789|   98480|ADDR|     RES|  102|        JFK BLVD|  PISCATAWAY|   NJ|  08854|
|   98480|PERSON|      TOM|   GREER|1982|   12| 27|   98480|PHONE|     CELL|     732|        201|6789|   98480|ADDR|     OFF|  211|     EXCHANGE PL| JERSEY CITY|   NJ|  07302|
|   98480|PERSON|      TOM|   GREER|1982|   12| 27|   98480|PHONE|     HOME|     732|        123|9876|   98480|ADDR|     RES|  102|        JFK BLVD|  PISCATAWAY|   NJ|  08854|
|   98480|PERSON|      TOM|   GREER|1982|   12| 27|   98480|PHONE|     HOME|     732|        123|9876|   98480|ADDR|     OFF|  211|     EXCHANGE PL| JERSEY CITY|   NJ|  07302|
+--------+------+---------+--------+----+-----+---+--------+-----+---------+--------+-----------+----+--------+----+--------+-----+----------------+------------+-----+-------+

我正在尝试将其转换为 java 对象,但没有取得多大成功。

我的 pojos 看起来像这样。

public class Person implements Serializable
{
    private String firstName;
    private String lastName;
    private String dateOfBirth;
    private List<Address> addresses = null;
    private List<Phone> phones = null;
......

public class Phone implements Serializable
{

    private String phoneNumber;



public class Address implements Serializable
{
    private String addr1;
    private String addr2;
    private String city;
    private String zipCode;
....

关于我该怎么做的任何想法

标签: javaapache-sparkapache-spark-sql

解决方案


推荐阅读