首页 > 解决方案 > Spark Sql 将 null 转换为复杂的 StructType

问题描述

我有具有以下架构的数据框,如果它不为空,explode_outer 在数组列上可以正常工作。有没有办法用该列的默认模式字段替换 null ,因此如果存在数据,explode 将使用与原始模式相同的 null 字段的默认值。

架构:

 |-- building: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- doorAccess: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- securityGroup: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- field: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- loginDetails: struct (nullable = true)
 |    |-- additionalInfo: struct (nullable = true)
 |    |    |-- member_id: string (nullable = true)
 |    |-- sessionId: string (nullable = true)
 |-- rack: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cable: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |-- color: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: long (nullable = true)
 |-- server: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)

使用此模式代码可以很好地处理示例数据。

{"server":{"name":"fifit","id":"1234"},"event":{"id":"sample1","timestamp":"2018-12-18T21:48:31.964Z"},"rack":[{"cable":[{"code":"","color":""}],"number":0,"name":""}],"building":[{"name":"name1","doorAccess":[{"key":"1","value":"2"}],"securityGroup":[{"field":"f1","name":"n1"},{"field":"f2","name":"n2"}]}],"loginDetails":{"sessionId":"SESSSION","additionalInfo":{"member_id":"1999359149"}}}

但是在输入数据列(机架)中,如果它接收到 null,explode 不起作用,因为它期望列的数组或映射类型。

{"server":{"name":"fifit","id":"1234"},"event":{"id":"sample1","timestamp":"2018-12-18T21:48:31.964Z"},"rack":null,"building":[{"name":"name1","doorAccess":[{"key":"1","value":"2"}],"securityGroup":[{"field":"f1","name":"n1"},{"field":"f2","name":"n2"}]}],"loginDetails":{"sessionId":"SESSSION","additionalInfo":{"member_id":"1999359149"}}}

我尝试使用以下替换列。

StructField[] structFields = new StructField[3];
      StructField a1 = new StructField("cableData", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty());
      StructField a2 = new StructField("number", DataTypes.StringType, true, Metadata.empty());
      StructField a3 = new StructField("name", DataTypes.StringType, true, Metadata.empty());
      structFields[0] = a1;
      structFields[1] = a2;
      structFields[2] = a3;
StructType schema  = new StructType(structFields);

Dataset<Row> m = jsonDF.withColumn("rack", when(col("rack").isNull(), array().cast(schema))); <-- Cannot cast to nested type. Not sure WrappedArray will help me here.
public static void main(String args[]) {
        String avroSchema =
        "{\"type\":\"record\",\"name\":\"experiments\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"server\",\"type\":{\"type\":\"record\",\"name\":\"record_data\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"name of application\",\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"doc\":\"ISO 8601 UTC timestamp from the message producer\",\"default\":null}]},\"doc\":\"record information\",\"default\":{}},{\"name\":\"event\",\"type\":{\"type\":\"record\",\"name\":\"event_data\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"doc\":\"Unique id of record: current ISO timestamp with the session id appended\",\"default\":null},{\"name\":\"timestamp\",\"type\":[\"null\",\"string\"],\"doc\":\"The time this event was received by the server. (Will be set by the server, so applications should leave blank)\",\"default\":null}]},\"doc\":\"event information\",\"default\":{}},{\"name\":\"rack\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"rack_mapping\",\"fields\":[{\"name\":\"cable\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"color_data\",\"fields\":[{\"name\":\"color\",\"type\":\"string\",\"doc\":\"value of the error thrown\",\"default\":\"\"},{\"name\":\"code\",\"type\":\"string\",\"doc\":\"reason for the error thrown\",\"default\":\"\"}],\"default\":{}}}],\"doc\":\"experiments error data schema\",\"default\":null},{\"name\":\"number\",\"type\":\"int\",\"doc\":\"Qualified context number\",\"default\":0},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"qualified experiment for page\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"building\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"recordViewMap\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"doc\":\"context condition qualified for device\",\"default\":\"\"},{\"name\":\"doorAccess\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"treatmentDelivered_mapping\",\"fields\":[{\"name\":\"key\",\"type\":\"string\",\"doc\":\"Key name of treatment parameter (e.g. buttonColor)\",\"default\":\"\"},{\"name\":\"value\",\"type\":\"string\",\"doc\":\"Value of treatment parameter (e.g. blue)\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"securityGroup\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"SecuryMap\",\"fields\":[{\"name\":\"field\",\"type\":\"string\",\"doc\":\"value of the error thrown\",\"default\":\"\"},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"reason for the error thrown\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"loginDetails\",\"type\":{\"type\":\"record\",\"name\":\"identity_data\",\"fields\":[{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"doc\":\"Session id as a random hex number in the range: 0xAAAAC8 - 0x1FFFFFFFFFFFE0\",\"default\":null},{\"name\":\"additionalInfo\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"additional data pertaining to notifications - can change dynamically based on notification type\",\"default\":null}]},\"doc\":\"\",\"default\":{}}]}";
        Schema schema3 = Schema.parse(avroSchema);
        JsonAvroConverter converter = new JsonAvroConverter();

        String x =
        "{\"server\":{\"name\":\"fifit\",\"id\":\"1234\"},\"event\":{\"id\":\"sample1\",\"timestamp\":\"2018-12-18T21:48:31.964Z\"},\"rack\":[{\"cable\":[{\"code\":\"\",\"color\":\"\"}],\"number\":0,\"name\":\"\"}],\"building\":[{\"name\":\"name1\",\"doorAccess\":[{\"key\":\"1\",\"value\":\"2\"}],\"securityGroup\":[{\"field\":\"f1\",\"name\":\"n1\"},{\"field\":\"f2\",\"name\":\"n2\"}]}],\"loginDetails\":{\"sessionId\":\"SESSSION\",\"additionalInfo\":{\"member_id\":\"1999359149\"}}}";
        byte[] a = x.getBytes();

        GenericData.Record record = converter.convertToGenericDataRecord(a, schema3);

        String ds = record.toString();
        List<String> jsonData = Arrays.asList(
            ds);
        SparkSession spark = SparkSession
            .builder()
            .master("local[*]")
            .appName("server Name")
            .getOrCreate();
        Dataset<String> jsonD = spark.createDataset(jsonData, Encoders.STRING());

        Dataset<Row> jsonDF = spark.read().json(jsonD);

        Dataset<Row> y = jsonDF.select(col("server"), explode_outer(col("rack")).alias("rack"), col("building").alias("building"), col("event"), col("loginDetails"));
        Dataset<Row> z = y.select(col("server"), col("rack"), col("loginDetails"), col("event"), explode_outer(y.col("building")).alias("building"));
        Dataset<Row> a1 = z.select(col("server"), col("loginDetails"), col("event"), explode_outer(col("rack.cable")).alias("rack_cable"), col("building"),
            col("rack.name").alias("rack_name"), col("rack.number").alias("rack_number"), col("building.name").alias("building_name"), col("building.securityGroup").alias("building_securityGroup"));
        Dataset<Row> a2 = a1.select(col("server"), col("loginDetails"), col("event"), col("rack_cable"), col("rack_name"), col("rack_number"), col("building_name"), explode_outer(col("building.doorAccess")).alias("building_doorAccess"), col("building_securityGroup"));
        Dataset<Row> a3 = a2.select(col("server"), col("loginDetails"), col("event"), col("building_name"), col("rack_cable"), col("rack_name"), col("rack_number"), col("building_doorAccess"), explode_outer(col("building_securityGroup")).alias("building_securityGroup"));
        String x1 = Utils.flattenSchema(a3.schema(), null);
                # FlattenSchema will flatten all structtype fields.
        System.out.println(")))))" + x1);
        a3.registerTempTable("flattenTable");
        System.out.println(a3.showString(10, 0, false));
        Dataset<Row> flattenData = spark.sql("SELECT " + x1 + " FROM flattenTable");
        System.out.println(flattenData.toJSON().toString());

    }

如果 col("rack") 为空,这就是我所期待的。

server_id,server_name,loginDetails_additionalInfo_member_id,loginDetails_sessionId,event_id,event_timestamp,building_name,rack_cable_code,rack_cable_color,rack_name,rack_number,building_doorAccess_key,building_doorAccess_value,building_securityGroup_field,building_securityGroup_name
1234,fifit,1999359149,SESSSION,sample1,2018-12-18T21:48:31.964Z,name1,"","","",0,1,2,f1,n1
1234,fifit,1999359149,SESSSION,sample1,2018-12-18T21:48:31.964Z,name1,"","","",0,1,2,f2,n2

标签: javaapache-spark-sql

解决方案


我使用 UDF 将空列转换为 StructType 解决了它

        List<DataClass> cd = new ArrayList<>();
        cd.add(new DataClass());
        StructField[] structFields = new StructField[3];
        StructField[] structFields1 = new StructField[2];
        structFields1[1]=new StructField("color", DataTypes.StringType, true, Metadata.empty());
        structFields1[0]=new StructField("code", DataTypes.StringType, true, Metadata.empty());
        StructType ss = DataTypes.createStructType(structFields1);
        StructField a11 = new StructField("cable", DataTypes.createArrayType(ss), false, Metadata.empty());
        StructField a31 = new StructField("number", DataTypes.StringType, true, Metadata.empty());
        StructField a21 = new StructField("name", DataTypes.StringType, true, Metadata.empty());
        structFields[0] = a11;
        structFields[1] = a21;
        structFields[2] = a31;
        StructType s = DataTypes.createStructType(structFields);
        ArrayType arr = DataTypes.createArrayType(s);
        mode = udf((WrappedArray s) -> cd, arr);

/*DataClass*/

 class DataClass implements Serializable {
    public String name;
    public String  number;

    List<Cable> cable = new ArrayList<>();
    public DataClass() {
        List<Cable> cables;
        cables = new ArrayList<>();
        cables.add(new Cable());
        Row row = RowFactory.create(cables.toArray());
        this.cable.add(new Cable());
        this.number = "";
        this.name = "";
    }

    @Override
    public String toString(){
        return new String("Rack:" +this.name + ", "+ this.number + ", " + this.cable);
    }

    static class Cable implements Serializable {
        public String color;
        public String code;

        public Cable() {
            this.color = "";
            this.code = "";
        }

        @Override
        public String toString(){
            return new String("Cable:" +this.color + ", "+ this.code);
        }
    }
}

推荐阅读