java - Spark Sql 将 null 转换为复杂的 StructType
问题描述
我有具有以下架构的数据框,如果它不为空,explode_outer 在数组列上可以正常工作。有没有办法用该列的默认模式字段替换 null ,因此如果存在数据,explode 将使用与原始模式相同的 null 字段的默认值。
架构:
|-- building: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- doorAccess: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- key: string (nullable = true)
| | | | |-- value: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- securityGroup: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- field: string (nullable = true)
| | | | |-- name: string (nullable = true)
|-- event: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- timestamp: string (nullable = true)
|-- loginDetails: struct (nullable = true)
| |-- additionalInfo: struct (nullable = true)
| | |-- member_id: string (nullable = true)
| |-- sessionId: string (nullable = true)
|-- rack: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cable: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- code: string (nullable = true)
| | | | |-- color: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- number: long (nullable = true)
|-- server: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
使用此模式代码可以很好地处理示例数据。
{"server":{"name":"fifit","id":"1234"},"event":{"id":"sample1","timestamp":"2018-12-18T21:48:31.964Z"},"rack":[{"cable":[{"code":"","color":""}],"number":0,"name":""}],"building":[{"name":"name1","doorAccess":[{"key":"1","value":"2"}],"securityGroup":[{"field":"f1","name":"n1"},{"field":"f2","name":"n2"}]}],"loginDetails":{"sessionId":"SESSSION","additionalInfo":{"member_id":"1999359149"}}}
但是在输入数据列(机架)中,如果它接收到 null,explode 不起作用,因为它期望列的数组或映射类型。
{"server":{"name":"fifit","id":"1234"},"event":{"id":"sample1","timestamp":"2018-12-18T21:48:31.964Z"},"rack":null,"building":[{"name":"name1","doorAccess":[{"key":"1","value":"2"}],"securityGroup":[{"field":"f1","name":"n1"},{"field":"f2","name":"n2"}]}],"loginDetails":{"sessionId":"SESSSION","additionalInfo":{"member_id":"1999359149"}}}
我尝试使用以下替换列。
StructField[] structFields = new StructField[3];
StructField a1 = new StructField("cableData", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty());
StructField a2 = new StructField("number", DataTypes.StringType, true, Metadata.empty());
StructField a3 = new StructField("name", DataTypes.StringType, true, Metadata.empty());
structFields[0] = a1;
structFields[1] = a2;
structFields[2] = a3;
StructType schema = new StructType(structFields);
Dataset<Row> m = jsonDF.withColumn("rack", when(col("rack").isNull(), array().cast(schema))); <-- Cannot cast to nested type. Not sure WrappedArray will help me here.
public static void main(String args[]) {
String avroSchema =
"{\"type\":\"record\",\"name\":\"experiments\",\"namespace\":\"dummy\",\"fields\":[{\"name\":\"server\",\"type\":{\"type\":\"record\",\"name\":\"record_data\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"name of application\",\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"doc\":\"ISO 8601 UTC timestamp from the message producer\",\"default\":null}]},\"doc\":\"record information\",\"default\":{}},{\"name\":\"event\",\"type\":{\"type\":\"record\",\"name\":\"event_data\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"doc\":\"Unique id of record: current ISO timestamp with the session id appended\",\"default\":null},{\"name\":\"timestamp\",\"type\":[\"null\",\"string\"],\"doc\":\"The time this event was received by the server. (Will be set by the server, so applications should leave blank)\",\"default\":null}]},\"doc\":\"event information\",\"default\":{}},{\"name\":\"rack\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"rack_mapping\",\"fields\":[{\"name\":\"cable\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"color_data\",\"fields\":[{\"name\":\"color\",\"type\":\"string\",\"doc\":\"value of the error thrown\",\"default\":\"\"},{\"name\":\"code\",\"type\":\"string\",\"doc\":\"reason for the error thrown\",\"default\":\"\"}],\"default\":{}}}],\"doc\":\"experiments error data schema\",\"default\":null},{\"name\":\"number\",\"type\":\"int\",\"doc\":\"Qualified context number\",\"default\":0},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"qualified experiment for page\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"building\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"recordViewMap\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"doc\":\"context condition qualified for device\",\"default\":\"\"},{\"name\":\"doorAccess\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"treatmentDelivered_mapping\",\"fields\":[{\"name\":\"key\",\"type\":\"string\",\"doc\":\"Key name of treatment parameter (e.g. buttonColor)\",\"default\":\"\"},{\"name\":\"value\",\"type\":\"string\",\"doc\":\"Value of treatment parameter (e.g. blue)\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"securityGroup\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"SecuryMap\",\"fields\":[{\"name\":\"field\",\"type\":\"string\",\"doc\":\"value of the error thrown\",\"default\":\"\"},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"reason for the error thrown\",\"default\":\"\"}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null}],\"default\":{}},\"default\":[]}],\"doc\":\"\",\"default\":null},{\"name\":\"loginDetails\",\"type\":{\"type\":\"record\",\"name\":\"identity_data\",\"fields\":[{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"doc\":\"Session id as a random hex number in the range: 0xAAAAC8 - 0x1FFFFFFFFFFFE0\",\"default\":null},{\"name\":\"additionalInfo\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"additional data pertaining to notifications - can change dynamically based on notification type\",\"default\":null}]},\"doc\":\"\",\"default\":{}}]}";
Schema schema3 = Schema.parse(avroSchema);
JsonAvroConverter converter = new JsonAvroConverter();
String x =
"{\"server\":{\"name\":\"fifit\",\"id\":\"1234\"},\"event\":{\"id\":\"sample1\",\"timestamp\":\"2018-12-18T21:48:31.964Z\"},\"rack\":[{\"cable\":[{\"code\":\"\",\"color\":\"\"}],\"number\":0,\"name\":\"\"}],\"building\":[{\"name\":\"name1\",\"doorAccess\":[{\"key\":\"1\",\"value\":\"2\"}],\"securityGroup\":[{\"field\":\"f1\",\"name\":\"n1\"},{\"field\":\"f2\",\"name\":\"n2\"}]}],\"loginDetails\":{\"sessionId\":\"SESSSION\",\"additionalInfo\":{\"member_id\":\"1999359149\"}}}";
byte[] a = x.getBytes();
GenericData.Record record = converter.convertToGenericDataRecord(a, schema3);
String ds = record.toString();
List<String> jsonData = Arrays.asList(
ds);
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("server Name")
.getOrCreate();
Dataset<String> jsonD = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> jsonDF = spark.read().json(jsonD);
Dataset<Row> y = jsonDF.select(col("server"), explode_outer(col("rack")).alias("rack"), col("building").alias("building"), col("event"), col("loginDetails"));
Dataset<Row> z = y.select(col("server"), col("rack"), col("loginDetails"), col("event"), explode_outer(y.col("building")).alias("building"));
Dataset<Row> a1 = z.select(col("server"), col("loginDetails"), col("event"), explode_outer(col("rack.cable")).alias("rack_cable"), col("building"),
col("rack.name").alias("rack_name"), col("rack.number").alias("rack_number"), col("building.name").alias("building_name"), col("building.securityGroup").alias("building_securityGroup"));
Dataset<Row> a2 = a1.select(col("server"), col("loginDetails"), col("event"), col("rack_cable"), col("rack_name"), col("rack_number"), col("building_name"), explode_outer(col("building.doorAccess")).alias("building_doorAccess"), col("building_securityGroup"));
Dataset<Row> a3 = a2.select(col("server"), col("loginDetails"), col("event"), col("building_name"), col("rack_cable"), col("rack_name"), col("rack_number"), col("building_doorAccess"), explode_outer(col("building_securityGroup")).alias("building_securityGroup"));
String x1 = Utils.flattenSchema(a3.schema(), null);
# FlattenSchema will flatten all structtype fields.
System.out.println(")))))" + x1);
a3.registerTempTable("flattenTable");
System.out.println(a3.showString(10, 0, false));
Dataset<Row> flattenData = spark.sql("SELECT " + x1 + " FROM flattenTable");
System.out.println(flattenData.toJSON().toString());
}
如果 col("rack") 为空,这就是我所期待的。
server_id,server_name,loginDetails_additionalInfo_member_id,loginDetails_sessionId,event_id,event_timestamp,building_name,rack_cable_code,rack_cable_color,rack_name,rack_number,building_doorAccess_key,building_doorAccess_value,building_securityGroup_field,building_securityGroup_name
1234,fifit,1999359149,SESSSION,sample1,2018-12-18T21:48:31.964Z,name1,"","","",0,1,2,f1,n1
1234,fifit,1999359149,SESSSION,sample1,2018-12-18T21:48:31.964Z,name1,"","","",0,1,2,f2,n2
解决方案
我使用 UDF 将空列转换为 StructType 解决了它
List<DataClass> cd = new ArrayList<>();
cd.add(new DataClass());
StructField[] structFields = new StructField[3];
StructField[] structFields1 = new StructField[2];
structFields1[1]=new StructField("color", DataTypes.StringType, true, Metadata.empty());
structFields1[0]=new StructField("code", DataTypes.StringType, true, Metadata.empty());
StructType ss = DataTypes.createStructType(structFields1);
StructField a11 = new StructField("cable", DataTypes.createArrayType(ss), false, Metadata.empty());
StructField a31 = new StructField("number", DataTypes.StringType, true, Metadata.empty());
StructField a21 = new StructField("name", DataTypes.StringType, true, Metadata.empty());
structFields[0] = a11;
structFields[1] = a21;
structFields[2] = a31;
StructType s = DataTypes.createStructType(structFields);
ArrayType arr = DataTypes.createArrayType(s);
mode = udf((WrappedArray s) -> cd, arr);
/*DataClass*/
class DataClass implements Serializable {
public String name;
public String number;
List<Cable> cable = new ArrayList<>();
public DataClass() {
List<Cable> cables;
cables = new ArrayList<>();
cables.add(new Cable());
Row row = RowFactory.create(cables.toArray());
this.cable.add(new Cable());
this.number = "";
this.name = "";
}
@Override
public String toString(){
return new String("Rack:" +this.name + ", "+ this.number + ", " + this.cable);
}
static class Cable implements Serializable {
public String color;
public String code;
public Cable() {
this.color = "";
this.code = "";
}
@Override
public String toString(){
return new String("Cable:" +this.color + ", "+ this.code);
}
}
}
推荐阅读
- ruby-on-rails - 调用错误进行验证时,nil:NilClass 的未定义方法“错误”
- xpages - 当我尝试从在 Notes 客户端中打开的 Xpage 页面下载文件时,Notes 崩溃
- discord.js - 时间命令说“NAN”:歌曲持续时间的“NaN”
- docker - 在 Ubuntu 机器上是否需要 Docker Machine?
- python - Python 3 - 如何从命令行应用程序中删除空行
- excel - 删除行时保持公式不变
- javascript - ReferenceError: 在初始化之前不能访问词法声明`X'
- laravel - Laravel Cashier 新订阅 Stripe 抛出“无法确定请求哪个 URL:”错误
- android - 在 Flutter 上退出原生 Android 活动需要两次后按
- r - 使用 check.names =FALSE 导入数据集会导致在 ggplot2 循环中绘图出现问题