首页 > 解决方案 > 无法将 avro 写为字节数组然后将其读回

问题描述

我刚刚开始使用 Avro,并且遇到了为想要将数据作为一系列字节读取的进程生成测试数据的问题。

当我查看我正在写出的一系列字节时,似乎我的第一个字段,即一个整数正在被破坏。第二个和第三个字段的字节似乎没问题。

假设这是我的 avro 架构

{
     "type": "record",
     "namespace": "com.foo",
     "name": "test",
     "version": "1",
     "fields": [
        {"name" : "code", "type" : "int", "default" : 1},
        { "name": "firstName", "type": "string", "doc": "firstName" },
        { "name": "lastName", "type": "string", "doc": "lastName" }
     ]
}

这是我的代码:

Schema avroSchema =
    SchemaBuilder.record("test").namespace("com.foo").
        fields().
            requiredInt("code").
            requiredString("firstName").
            requiredString("lastName").endRecord();
GenericRecord avroMessage = new GenericData.Record(avroSchema);
avroMessage.put("code", 7);
avroMessage.put("firstName", "robert");
avroMessage.put("lastName", "wong");

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<Object> datumWriter = new GenericDatumWriter<>(avroSchema);
datumWriter.write(avroMessage, encoder);
encoder.flush();
baos.close();
byte[] data = baos.toByteArray();

Integer  code = ByteBuffer.wrap(data).getInt();
System.out.println("code:" + code);


// Result is  code:235696751  -- not code:7 as expected.

知道我可能在做什么破坏第一个整数字段吗?

标签: javaserializationschemaavro

解决方案


找到了问题的根源。

最初,我将写入的 avro 记录的第一个字节作为 Int 读取,但没有单独写出这个 Int,只是写出整个记录——其中包含一个 Int 作为其第一个属性。我期待我最初的整数读取来获取这个属性。

但事实证明,Avro Ints 是使用某种形式的压缩写出来的(从我能看出的类似于 zig zag 整数压缩......但这是主要问题的一个次要问题)。因此,解决方案是在写入完整的 Avro 记录之前显式写出整数。

关于为什么我们首先需要这个整数的一些背景知识:

The app I am using uses a home grown avro schema management 
approach where schemas are versioned, and the integer code 
tells you which version of the schema to use for deserialization.

这是修改后的代码:

Schema mainSchema =
    SchemaBuilder.record("test").namespace("com.foo").
        fields().
        requiredInt("code").
        requiredString("nickName").
        requiredString("lastName").endRecord();
GenericRecord avroMessage = new GenericData.Record(mainSchema);
avroMessage.put("code", 67);
avroMessage.put("nickName", "robert");
avroMessage.put("lastName", "smith");


ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (DataOutputStream os = new DataOutputStream(baos)) {
  os.writeInt(1);  // Write out the integer code BEFORE the record
}

Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
DatumWriter<Object> datumWriter = new GenericDatumWriter<>(mainSchema);
datumWriter.write(avroMessage, encoder);
encoder.flush();
baos.close();
byte[] data = baos.toByteArray();

ByteBuffer wrapped = ByteBuffer.wrap(data);
Integer theInt = wrapped.getInt();
byte[] event = new byte[wrapped.remaining()];
wrapped.get(event);

DatumReader<GenericRecord> reader = new GenericDatumReader<>(mainSchema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event, null);
GenericRecord record = reader.read(null, decoder);

System.out.println("theInt:" + theInt);   // should print 67
System.out.println("record:" + record);   // should print:    {"code": 0, "nickName": "", "lastName": ""}

推荐阅读