首页 > 解决方案 > 当 Kryo 序列化与 Spark 一起使用时 ProtoBuf 中的 NullPointerException

问题描述

当我的 Spark 应用程序尝试序列化作为键字符串和值浮点数的映射的 protobuf 字段时,我在我的 Spark 应用程序中收到以下错误。在 spark 应用程序中使用了 Kryo 序列化。

Caused by: java.lang.NullPointerException
    at com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
    at java.util.AbstractList.add(AbstractList.java:108)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 71 more

有没有人遇到过这个问题?有没有办法解决它?

标签: apache-sparkkryoprotobuf-java

解决方案


当 Kryo 遇到一个它无法识别的类的对象时,它会退回到 Java 序列化。

但是可以设置Kryo为抛出异常而不是这样:

final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

我决定保留上面的注册,因为它有助于避免某些可能对性能产生负面影响的类的缓慢序列化。

为了处理 Protobuf 生成的类序列化,我使用了以下类:

package com.juarezr.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.protobuf.AbstractMessage;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class ProtobufSerializer<T extends AbstractMessage> extends Serializer<T> implements Serializable {
    
    static final long serialVersionUID = 1667386898559074449L;
    protected final Method parser;

    public ProtobufSerializer(final Class<T> protoMessageClass) {
        try {
            this.parser = protoMessageClass.getDeclaredMethod("parseFrom", byte[].class);
            this.parser.setAccessible(true);
        } catch (SecurityException | NoSuchMethodException ex) {
            throw new IllegalArgumentException(protoMessageClass.toString() + " doesn't have a protobuf parser", ex);
        }
    }

    @Override
    public void write(final Kryo kryo, final Output output, final T protobufMessage) {
        if (protobufMessage == null) {
            output.writeByte(Kryo.NULL);
            output.flush();
            return;
        }
        final byte[] bytes = protobufMessage.toByteArray();
        output.writeInt(bytes.length + 1, true);
        output.writeBytes(bytes);
        output.flush();
    }

    @SuppressWarnings({"unchecked", "JavaReflectionInvocation"})
    @Override
    public T read(final Kryo kryo, final Input input, final Class<T> protoMessageClass) {
        final int length = input.readInt(true);
        if (length == Kryo.NULL) {
            return null;
        }
        final Object bytesRead = input.readBytes(length - 1);
        try {
            final Object parsed = this.parser.invoke(protoMessageClass, bytesRead);
            return (T) parsed;
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException("Unable to deserialize protobuf for class: " + protoMessageClass.getName(), e);
        }
    }

    @Override
    public boolean getAcceptsNull() {
        return true;
    }

    @SuppressWarnings("unchecked")
    public static <M extends AbstractMessage> void registerMessagesFrom(final M rootMessage, final Kryo kryo) {

        final Class<M> messageClass = (Class<M>) rootMessage.getClass();
        final ProtobufSerializer<M> serializer = new ProtobufSerializer<>(messageClass);
        kryo.register(messageClass, serializer);

        final Class<?>[] nestedClasses = messageClass.getDeclaredClasses();
        for (final Class<?> innerClass : nestedClasses) {
            if ((AbstractMessage.class).isAssignableFrom(innerClass)) {
                final Class<M> typedClass = (Class<M>) innerClass;
                final ProtobufSerializer<M> serializer2 = new ProtobufSerializer<>(typedClass);
                kryo.register(typedClass, serializer2);
            }
        }
    }
}

您可以使用以下内容配置序列化:

// ...
final Kryo kryo = new Kryo();
kryo.setRegistrationRequired(true);

// Add a registration for each generated file and top level class ...
ProtobufSerializer.registerMessagesFrom(MyProtoEnclosingClass.MyProtoTopLevelClass.getDefaultInstance(), kryo);

// Add a registration for each other Java/Scala class you would need...

推荐阅读