java - 无法将 avro 写为字节数组然后将其读回
问题描述
我刚刚开始使用 Avro,并且遇到了为想要将数据作为一系列字节读取的进程生成测试数据的问题。
当我查看我正在写出的一系列字节时,似乎我的第一个字段,即一个整数正在被破坏。第二个和第三个字段的字节似乎没问题。
假设这是我的 avro 架构
{
"type": "record",
"namespace": "com.foo",
"name": "test",
"version": "1",
"fields": [
{"name" : "code", "type" : "int", "default" : 1},
{ "name": "firstName", "type": "string", "doc": "firstName" },
{ "name": "lastName", "type": "string", "doc": "lastName" }
]
}
这是我的代码:
Schema avroSchema =
SchemaBuilder.record("test").namespace("com.foo").
fields().
requiredInt("code").
requiredString("firstName").
requiredString("lastName").endRecord();
GenericRecord avroMessage = new GenericData.Record(avroSchema);
avroMessage.put("code", 7);
avroMessage.put("firstName", "robert");
avroMessage.put("lastName", "wong");
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<Object> datumWriter = new GenericDatumWriter<>(avroSchema);
datumWriter.write(avroMessage, encoder);
encoder.flush();
baos.close();
byte[] data = baos.toByteArray();
Integer code = ByteBuffer.wrap(data).getInt();
System.out.println("code:" + code);
// Result is code:235696751 -- not code:7 as expected.
知道我可能在做什么破坏第一个整数字段吗?
解决方案
找到了问题的根源。
最初,我将写入的 avro 记录的第一个字节作为 Int 读取,但没有单独写出这个 Int,只是写出整个记录——其中包含一个 Int 作为其第一个属性。我期待我最初的整数读取来获取这个属性。
但事实证明,Avro Ints 是使用某种形式的压缩写出来的(从我能看出的类似于 zig zag 整数压缩......但这是主要问题的一个次要问题)。因此,解决方案是在写入完整的 Avro 记录之前显式写出整数。
关于为什么我们首先需要这个整数的一些背景知识:
The app I am using uses a home grown avro schema management
approach where schemas are versioned, and the integer code
tells you which version of the schema to use for deserialization.
这是修改后的代码:
Schema mainSchema =
SchemaBuilder.record("test").namespace("com.foo").
fields().
requiredInt("code").
requiredString("nickName").
requiredString("lastName").endRecord();
GenericRecord avroMessage = new GenericData.Record(mainSchema);
avroMessage.put("code", 67);
avroMessage.put("nickName", "robert");
avroMessage.put("lastName", "smith");
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStream os = new DataOutputStream(baos)) {
os.writeInt(1); // Write out the integer code BEFORE the record
}
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<Object> datumWriter = new GenericDatumWriter<>(mainSchema);
datumWriter.write(avroMessage, encoder);
encoder.flush();
baos.close();
byte[] data = baos.toByteArray();
ByteBuffer wrapped = ByteBuffer.wrap(data);
Integer theInt = wrapped.getInt();
byte[] event = new byte[wrapped.remaining()];
wrapped.get(event);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(mainSchema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event, null);
GenericRecord record = reader.read(null, decoder);
System.out.println("theInt:" + theInt); // should print 67
System.out.println("record:" + record); // should print: {"code": 0, "nickName": "", "lastName": ""}
推荐阅读
- c# - 我的客户是否需要安装 SQL Server Express 才能让我的 WiX 安装程序为我的软件安装 SQL Server 数据库?
- apache-flink - 为什么 Kinesis Data Analytics for Flink 在放大或缩小时会丢失状态?
- ruby - 为什么捆绑使用较低版本的 ruby
- django - 无法登录到 Django 管理面板
- json - 定义一个“平面”的 OpenAPI 文档模式?
- python - 在 1 个脚本中运行 2 个永久运行的函数
- angularjs - 参数“...”隐式具有“任何”类型
- microsoft-graph-api - 如何通过 Microsoft Graph Api 正确请求 Sharepoint Online 站点权限
- java - java中的电子邮件验证循环不起作用
- excel - 将“findnext”功能合并到现有的“查找”代码中?