java - 如何将 Row 中的结构字段转换为 Spark Java 中的 avro 记录
问题描述
我有一个用例,我想将结构字段转换为 Avro 记录。struct 字段最初映射到 Avro 类型。输入数据是 avro 文件,struct 字段对应于输入 avro 记录中的一个字段。
以下是我想用伪代码实现的目标。
DataSet<Row> data = loadInput(); // data is of form (foo, bar, myStruct) from avro data.
// do some joins to add more data
data = doJoins(data); // now data is of form (a, b, myStruct)
// transform DataSet<Row> to DataSet<MyType>
DataSet<MyType> myData = data.map(row -> myUDF(row), encoderOfMyType);
// method `myUDF` definition
MyType myUDF(Row row) {
String a = row.getAs("a");
String b = row.getAs("b");
// MyStruct is the generated avro class that corresponds to field myStruct
MyStruct myStruct = convertToAvro(row.getAs("myStruct"));
return generateMyType(a, b, myStruct);
}
我的问题是:如何实现convertToAvro
上述伪代码中的方法?
解决方案
从文档中:
Avro 包提供函数 to_avro 将列编码为 Avro 格式的二进制,并提供 from_avro() 将 Avro 二进制数据解码为列。这两个函数都将一列转换为另一列,输入/输出 SQL 数据类型可以是复杂类型或原始类型。
函数to_avro充当该convertToAvro
方法的替换:
import static org.apache.spark.sql.avro.functions.*;
//put the avro schema of the struct column into a string
//in my example I assume that the struct consists of a two fields:
//a long field (s1) and a string field (s2)
String schema = "{\"type\":\"record\",\"name\":\"mystruct\"," +
"\"namespace\":\"topLevelRecord\",\"fields\":[{\"name\":\"s1\"," +
"\"type\":[\"long\",\"null\"]},{\"name\":\"s2\",\"type\":" +
"[\"string\",\"null\"]}]},\"null\"]}";
data = ...
//add an additional column containing the struct as binary column
Dataset<Row> data2 = df.withColumn("to_avro", to_avro(data.col("myStruct"), schema));
df2.printSchema();
df2.show(false);
印刷
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- mystruct: struct (nullable = true)
| |-- s1: long (nullable = true)
| |-- s2: string (nullable = true)
|-- to_avro: binary (nullable = true)
+----+----+----------+----------------------------+
|a |b |mystruct |to_avro |
+----+----+----------+----------------------------+
|foo1|bar1|[1, one] |[00 02 00 06 6F 6E 65] |
|foo2|bar2|[3, three]|[00 06 00 0A 74 68 72 65 65]|
+----+----+----------+----------------------------+
要将 avro 列转换回来,可以使用函数from_avro :
Dataset<Row> data3 = data2.withColumn("from_avro", from_avro(data2.col("to_avro"), schema));
df3.printSchema();
df3.show();
输出:
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- mystruct: struct (nullable = true)
| |-- s1: long (nullable = true)
| |-- s2: string (nullable = true)
|-- to_avro: binary (nullable = true)
|-- from_avro: struct (nullable = true)
| |-- s1: long (nullable = true)
| |-- s2: string (nullable = true)
+----+----+----------+--------------------+----------+
| a| b| mystruct| to_avro| from_avro|
+----+----+----------+--------------------+----------+
|foo1|bar1| [1, one]|[00 02 00 06 6F 6...| [1, one]|
|foo2|bar2|[3, three]|[00 06 00 0A 74 6...|[3, three]|
+----+----+----------+--------------------+----------+
关于 udf 的一句话:在问题中,您在 udf 中执行了到 avro 格式的转换。我宁愿只在 udf 中包含实际的业务逻辑,并将格式转换保留在外部。这将逻辑和格式转换分开。如有必要,您可以mystruct
在创建 avro 列后删除原始列。
推荐阅读
- python - 在 Python 中将具有行/列索引的数据转换为表
- python - Python - 将数据转移到不同的行
- react-native - 卸载并移动屏幕后在 Audio.Sound() 上调用的方法
- r - 如何将 diff() 与 group_by() 结合使用
- jekyll - 使用分页时如何按修改日期对帖子进行排序?
- sql - SQL:根据最近的日期选择一个字段中的值是唯一的记录
- c# - StackLayout 中的 ChartView 截断图表
- angular - 您好,我在发出 http 删除请求时收到以下错误
- scala - Scala“无法在对象 CassandraConnector 中访问 DCAwareRoundRobinPolicy 类中的构造函数 DCAwareRoundRobinPolicy”
- html - 有条件地禁用Angular中的标签